ScamDetector / utils /functions.py
rafaldembski's picture
Update utils/functions.py
baa78ec verified
raw
history blame
9.81 kB
# utils/functions.py
import phonenumbers
from phonenumbers import geocoder, carrier
import re
import requests
import os
import json
from datetime import datetime
import logging
# Konfiguracja logowania
logging.basicConfig(filename='app.log', level=logging.ERROR, format='%(asctime)s %(levelname)s:%(message)s')
# Ścieżka do pliku JSON przechowującego fałszywe numery
FAKE_NUMBERS_FILE = 'fake_numbers.json'
# Inicjalizacja pliku JSON przechowującego fałszywe numery
def init_fake_numbers_file():
if not os.path.exists(FAKE_NUMBERS_FILE):
with open(FAKE_NUMBERS_FILE, 'w') as f:
json.dump([], f)
else:
# Sprawdzenie, czy plik nie jest pusty i zawiera prawidłowy JSON
try:
with open(FAKE_NUMBERS_FILE, 'r') as f:
json.load(f)
except json.JSONDecodeError:
# Jeśli plik jest uszkodzony lub pusty, zresetuj go do pustej listy
with open(FAKE_NUMBERS_FILE, 'w') as f:
json.dump([], f)
# Dodanie numeru telefonu do pliku JSON
def add_fake_number(phone_number):
try:
with open(FAKE_NUMBERS_FILE, 'r') as f:
fake_numbers = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
fake_numbers = []
if not any(entry["phone_number"] == phone_number for entry in fake_numbers):
fake_numbers.append({
"phone_number": phone_number,
"reported_at": datetime.now().isoformat()
})
try:
with open(FAKE_NUMBERS_FILE, 'w') as f:
json.dump(fake_numbers, f, indent=4)
return True
except Exception as e:
logging.error(f"Nie udało się zapisać numeru {phone_number}: {e}")
return False
else:
return False # Numer już istnieje
# Sprawdzenie, czy numer telefonu jest w pliku JSON
def is_fake_number(phone_number):
try:
with open(FAKE_NUMBERS_FILE, 'r') as f:
fake_numbers = json.load(f)
return any(entry["phone_number"] == phone_number for entry in fake_numbers)
except (json.JSONDecodeError, FileNotFoundError):
return False
# Pobierz klucz API z zmiennej środowiskowej
API_KEY = os.getenv('SAMBANOVA_API_KEY')
# Funkcja do weryfikacji numeru telefonu
def get_phone_info(phone_number):
try:
parsed_number = phonenumbers.parse(phone_number, None)
country = geocoder.description_for_number(parsed_number, 'pl')
operator = carrier.name_for_number(parsed_number, 'pl')
return country, operator
except phonenumbers.NumberParseException:
return None, None
# Proste sprawdzenia heurystyczne wiadomości
def simple_checks(message):
warnings = []
# Słowa kluczowe często używane w oszustwach
scam_keywords = ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata']
if any(keyword in message.lower() for keyword in scam_keywords):
warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.")
# Sprawdzenie obecności linków
if re.search(r'http[s]?://', message):
warnings.append("Wiadomość zawiera link.")
# Sprawdzenie, czy nadawca prosi o poufne informacje
if re.search(r'\b(podaj|prześlij|udostępnij)\b.*\b(hasło|kod|dane osobowe|numer konta)\b', message.lower()):
warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
return warnings
# Funkcja do analizy wiadomości za pomocą API SambaNova z głębszym procesem myślenia
def analyze_message(message, phone_number, additional_info, api_key):
if not api_key:
logging.error("Brak klucza API.")
return "Brak klucza API.", "Brak klucza API.", "Brak klucza API."
url = "https://api.sambanova.ai/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}"
}
# Rozbudowany system prompt z głębszym procesem myślenia
system_prompt = """
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje:
<analysis>
**Analiza Treści Wiadomości:**
- Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp.
- Opisz kontekst językowy i kulturowy wiadomości.
- Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy.
</analysis>
<risk_assessment>
**Ocena Ryzyka Oszustwa:**
- Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko.
- Wyjaśnij, jakie czynniki wpływają na tę ocenę.
</risk_assessment>
<recommendations>
**Zalecenia dla Użytkownika:**
- Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć.
- Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości.
- Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe.
</recommendations>
Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo.
"""
user_prompt = f"""Przeanalizuj następującą wiadomość pod kątem potencjalnego oszustwa:
Wiadomość: "{message}"
Numer telefonu nadawcy: "{phone_number}"
Dodatkowe informacje:
{additional_info}
Podaj swoją analizę i wnioski zgodnie z powyższymi wytycznymi."""
payload = {
"model": "Meta-Llama-3.1-8B-Instruct",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_tokens": 1000,
"temperature": 0.2,
"top_p": 0.9,
"stop": ["<|eot_id|>"]
}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
ai_response = data['choices'][0]['message']['content']
# Parsowanie odpowiedzi
analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL)
risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL)
recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL)
analysis_text = analysis.group(1).strip() if analysis else "Brak analizy."
risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka."
recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń."
return analysis_text, risk_text, recommendations_text
else:
logging.error(f"Błąd API: {response.status_code} - {response.text}")
return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy."
except Exception as e:
logging.error(f"Błąd połączenia z API: {e}")
return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy."
# Inicjalizacja pliku statystyk
def init_stats_file():
stats_file = 'stats.json'
if not os.path.exists(stats_file):
with open(stats_file, 'w') as f:
json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f)
# Pobranie statystyk
def get_stats():
stats_file = 'stats.json'
try:
with open(stats_file, 'r') as f:
stats = json.load(f)
return stats
except (json.JSONDecodeError, FileNotFoundError):
return {"total_analyses": 0, "total_frauds_detected": 0}
# Aktualizacja statystyk analizy
def update_stats(fraud_detected=False):
stats_file = 'stats.json'
try:
with open(stats_file, 'r') as f:
stats = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
stats = {"total_analyses": 0, "total_frauds_detected": 0}
stats["total_analyses"] += 1
if fraud_detected:
stats["total_frauds_detected"] += 1
with open(stats_file, 'w') as f:
json.dump(stats, f, indent=4)
# Inicjalizacja pliku historii analiz
def init_history_file():
history_file = 'history.json'
if not os.path.exists(history_file):
with open(history_file, 'w') as f:
json.dump([], f)
# Dodanie wpisu do historii analiz
def add_to_history(message, phone_number, analysis, risk, recommendations):
history_file = 'history.json'
try:
with open(history_file, 'r') as f:
history = json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
history = []
history.append({
"timestamp": datetime.now().isoformat(),
"message": message,
"phone_number": phone_number,
"analysis": analysis,
"risk_assessment": risk,
"recommendations": recommendations
})
with open(history_file, 'w') as f:
json.dump(history, f, indent=4)
# Pobranie historii analiz
def get_history():
history_file = 'history.json'
try:
with open(history_file, 'r') as f:
history = json.load(f)
return history
except (json.JSONDecodeError, FileNotFoundError):
return []