ScamDetector / utils /functions.py
rafaldembski's picture
Update utils/functions.py
6a60a11 verified
import phonenumbers
from phonenumbers import geocoder, carrier
import re
import requests
import os
from datetime import datetime
import logging
import json
from PIL import Image # Upewnij się, że zainstalowałeś tę bibliotekę: pip install pillow
import pytesseract # Upewnij się, że zainstalowałeś tę bibliotekę: pip install pytesseract
# Konfiguracja logowania
logging.basicConfig(
filename='app.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(message)s'
)
# Definiowanie ścieżek do plików JSON
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, '..', 'data')
FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json')
HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
# Funkcje pomocnicze
def load_json(file_path):
"""Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt."""
if not os.path.exists(file_path):
if file_path.endswith('stats.json'):
return {"total_analyses": 0, "total_frauds_detected": 0}
else:
return []
with open(file_path, 'r', encoding='utf-8') as file:
try:
data = json.load(file)
return data
except json.JSONDecodeError:
logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.")
if file_path.endswith('stats.json'):
return {"total_analyses": 0, "total_frauds_detected": 0}
return []
def save_json(file_path, data):
"""Zapisuje dane do pliku JSON."""
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
logging.info(f"Dane zostały zapisane do {file_path}.")
def add_fake_number(phone_number):
"""Dodaje numer telefonu do pliku fake_numbers.json jako fałszywy, jeśli jeszcze go tam nie ma."""
fake_numbers = load_json(FAKE_NUMBERS_FILE)
if phone_number not in fake_numbers:
fake_numbers.append(phone_number)
save_json(FAKE_NUMBERS_FILE, fake_numbers)
logging.info(f"Numer {phone_number} został pomyślnie dodany do fake_numbers.json.")
return True
else:
logging.info(f"Numer {phone_number} już istnieje w fake_numbers.json.")
return False
def is_fake_number(phone_number):
"""Sprawdza, czy dany numer telefonu jest oznaczony jako fałszywy w pliku fake_numbers.json."""
fake_numbers = load_json(FAKE_NUMBERS_FILE)
exists = phone_number in fake_numbers
logging.info(f"Sprawdzanie numeru {phone_number}: {'znaleziony' if exists else 'nie znaleziony'}.")
return exists
def get_fake_numbers():
"""Pobiera listę fałszywych numerów z pliku fake_numbers.json."""
fake_numbers = load_json(FAKE_NUMBERS_FILE)
return fake_numbers
def add_to_history(message, phone_number, analysis, risk, recommendations):
"""Dodaje wpis do historii analiz w pliku history.json."""
history = load_json(HISTORY_FILE)
history.append({
"timestamp": datetime.now().isoformat(),
"message": message,
"phone_number": phone_number,
"analysis": analysis,
"risk_assessment": risk,
"recommendations": recommendations
})
save_json(HISTORY_FILE, history)
logging.info(f"Dodano wpis do history.json dla numeru {phone_number}.")
def get_history():
"""Pobiera historię analiz z pliku history.json jako listę słowników."""
history = load_json(HISTORY_FILE)
logging.info("Historia analiz została pobrana pomyślnie.")
return history
def update_stats(fraud_detected=False):
"""Aktualizuje statystyki analiz w pliku stats.json."""
stats = load_json(STATS_FILE)
stats["total_analyses"] += 1
if fraud_detected:
stats["total_frauds_detected"] += 1
save_json(STATS_FILE, stats)
logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['total_analyses']}, Oszustw {stats['total_frauds_detected']}.")
def get_stats():
"""Pobiera statystyki analiz z pliku stats.json."""
stats = load_json(STATS_FILE)
logging.info("Statystyki zostały pobrane pomyślnie.")
return stats
def get_phone_info(phone_number):
"""Weryfikuje numer telefonu i zwraca informacje o kraju i operatorze."""
try:
parsed_number = phonenumbers.parse(phone_number, None)
country = geocoder.description_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego
operator = carrier.name_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego
if not country:
country = "Nieznany"
if not operator:
operator = "Nieznany"
logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.")
return country, operator
except phonenumbers.NumberParseException as e:
logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}")
return "Nieznany", "Nieznany"
def simple_checks(message, language):
"""Przeprowadza proste sprawdzenia heurystyczne wiadomości SMS."""
warnings = []
scam_keywords = {
'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'],
'German': ['Geld', 'Überweisung', 'Passwort', 'Code', 'Preis', 'Gewinn', 'dringend', 'Hilfe', 'Gebühr', 'Bank', 'Karte', 'Konto', 'Anmeldung', 'Transaktion', 'Verifizierung', 'persönliche Daten', 'schnelle Zahlung', 'Kontosperrung', 'Benachrichtigung'],
'English': ['money', 'transfer', 'password', 'code', 'prize', 'win', 'urgent', 'help', 'fee', 'bank', 'card', 'account', 'login', 'transaction', 'verification', 'personal information', 'quick payment', 'account lock', 'notification']
}
selected_keywords = scam_keywords.get(language, scam_keywords['English'])
message_lower = message.lower()
if any(keyword.lower() in message_lower for keyword in selected_keywords):
warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.")
if re.search(r'http[s]?://', message):
warnings.append("Wiadomość zawiera link.")
if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower):
warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
return warnings
def analyze_message(message, phone_number, additional_info, api_key, language):
"""Analizuje wiadomość SMS za pomocą API SambaNova."""
if not api_key:
logging.error("Brak klucza API.")
return "Brak klucza API.", "Brak klucza API.", "Brak klucza API."
url = "https://api.sambanova.ai/v1/chat/completions" # Upewnij się, że to poprawny URL
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
system_prompts = {
'Polish': """
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje:
<analysis>
**📝 Analiza Treści Wiadomości:**
- Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp.
- Jakie elementy treści mogą wskazywać na oszustwo?
- Jakie słowa kluczowe są używane w wiadomości? (np. "pieniądze", "przelew", "nagroda")
- Jakie są reakcje na tę wiadomość w kontekście kulturowym i językowym?
**❓ Dodatkowe pytania do przemyślenia:**
- Kiedy i jak często otrzymujesz wiadomości z tego numeru?
- Czy numer nadawcy jest znany z innych źródeł?
- Jakie są konsekwencje dla Ciebie, jeśli ta wiadomość jest oszustwem?
- Jakie masz doświadczenia z podobnymi wiadomościami w przeszłości?
</analysis>
<risk_assessment>
**⚖️ Ocena Ryzyka Oszustwa:**
- Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko.
- Jakie czynniki wpływają na tę ocenę?
- Jakie są argumenty za i przeciw ocenie tej wiadomości jako oszustwa?
**❓ Dodatkowe pytania do przemyślenia:**
- Jakie inne wiadomości z tego numeru otrzymywałeś w przeszłości?
- Czy wiadomość zawiera jakiekolwiek inne informacje, które mogłyby być użyteczne w ocenie ryzyka?
- Jakie są Twoje dotychczasowe doświadczenia z oszustwami SMS?
- Jakie są Twoje przemyślenia na temat nadawcy tej wiadomości?
</risk_assessment>
<recommendations>
**💡 Zalecenia dla Użytkownika:**
- Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć.
- Jakie środki bezpieczeństwa powinny być wdrożone?
**❓ Dodatkowe pytania do przemyślenia:**
- Czy powinieneś zgłosić tę wiadomość do operatora sieci?
- Jakie dodatkowe kroki możesz podjąć, aby upewnić się, że nie padłeś ofiarą oszustwa?
- Czy masz dostęp do innych narzędzi, które mogą pomóc w ocenie tej wiadomości?
- Jakie działania możesz podjąć, aby zwiększyć swoje bezpieczeństwo w przyszłości?
</recommendations>
""",
'German': """
Du bist ein fortgeschrittener KI-Assistent, spezialisiert auf die Identifizierung gefälschter SMS-Nachrichten. Deine Aufgabe ist es, eine detaillierte Analyse der Nachricht durchzuführen, indem du einen tiefgreifenden Denkprozess nutzt und eine umfassende Bewertung lieferst. Deine Antwort sollte in drei Abschnitte unterteilt sein:
<analysis>
**📝 Nachrichteninhaltsanalyse:**
- Führe eine detaillierte Analyse des Nachrichteninhalts durch und identifiziere potenzielle rote Flaggen wie sprachliche Fehler, Aufforderungen zur Preisgabe persönlicher Daten, dringende Kontaktanfragen usw.
- Welche Elemente im Inhalt könnten auf Betrug hinweisen?
- Welche Schlüsselwörter werden in der Nachricht verwendet? (z. B. "Geld", "Überweisung", "Preis")
- Wie reagieren die Menschen auf diese Nachricht im kulturellen und sprachlichen Kontext?
**❓ Zusätzliche Fragen zur Überlegung:**
- Wann und wie oft erhältst du Nachrichten von dieser Nummer?
- Ist die Nummer des Absenders aus anderen Quellen bekannt?
- Was sind die Konsequenzen für dich, wenn diese Nachricht ein Betrug ist?
- Welche Erfahrungen hast du in der Vergangenheit mit ähnlichen Nachrichten gemacht?
</analysis>
<risk_assessment>
**⚖️ Betrugsrisikobewertung:**
- Bewerte die Wahrscheinlichkeit, dass die Nachricht betrügerisch ist, auf einer Skala von 1 bis 10, wobei 1 sehr geringes Risiko und 10 sehr hohes Risiko bedeutet.
- Welche Faktoren beeinflussen diese Bewertung?
- Was sind die Argumente für und gegen die Bewertung dieser Nachricht als Betrug?
**❓ Zusätzliche Fragen zur Überlegung:**
- Welche anderen Nachrichten hast du in der Vergangenheit von dieser Nummer erhalten?
- Enthält die Nachricht weitere Informationen, die bei der Risikobewertung hilfreich sein könnten?
- Welche bisherigen Erfahrungen hast du mit SMS-Betrügereien gemacht?
- Welche Gedanken hast du über den Absender dieser Nachricht?
</risk_assessment>
<recommendations>
**💡 Empfehlungen für den Benutzer:**
- Gib klare und spezifische Empfehlungen zu den nächsten Schritten, die der Benutzer unternehmen sollte.
- Welche Sicherheitsmaßnahmen sollten umgesetzt werden?
**❓ Zusätzne Fragen zur Überlegung:**
- Solltest du diese Nachricht deinem Mobilfunkanbieter melden?
- Welche weiteren Schritte kannst du unternehmen, um sicherzustellen, dass du nicht Opfer eines Betrugs geworden bist?
- Hast du Zugriff auf andere Werkzeuge, die dir helfen können, diese Nachricht zu bewerten?
- Welche Maßnahmen kannst du ergreifen, um deine Sicherheit in Zukunft zu erhöhen?
</recommendations>
""",
'English': """
You are an advanced AI assistant specializing in identifying fake SMS messages. Your task is to conduct a detailed analysis of the message, utilizing a deep thinking process and providing a comprehensive assessment. Your response should be divided into three sections:
<analysis>
**📝 Message Content Analysis:**
- Conduct a detailed analysis of the message content, identifying potential red flags such as language errors, requests for personal information, urgent contact requests, etc.
- What elements of the content may indicate fraud?
- What keywords are used in the message? (e.g., "money", "transfer", "prize")
- What are the cultural and linguistic reactions to this message?
**❓ Additional questions to consider:**
- When and how often do you receive messages from this number?
- Is the sender's number known from other sources?
- What are the consequences for you if this message is a fraud?
- What experiences have you had in the past with similar messages?
</analysis>
<risk_assessment>
**⚖️ Fraud Risk Assessment:**
- Assess the likelihood that the message is fraudulent on a scale from 1 to 10, where 1 indicates very low risk and 10 indicates very high risk.
- What factors influence this assessment?
- What are the arguments for and against assessing this message as fraud?
**❓ Additional questions to consider:**
- What other messages have you received from this number in the past?
- Does the message contain any other information that could be useful in assessing the risk?
- What previous experiences do you have with SMS scams?
- What are your thoughts on the sender of this message?
</risk_assessment>
<recommendations>
**💡 User Recommendations:**
- Provide clear and concrete recommendations regarding the next steps the user should take.
- What security measures should be implemented?
**❓ Additional questions to consider:**
- Should you report this message to your service provider?
- What additional steps can you take to ensure that you have not fallen victim to a scam?
- Do you have access to other tools that can help you assess this message?
- What actions can you take to enhance your security in the future?
</recommendations>
Your response should be formatted exactly as specified above, using the <analysis>, <risk_assessment>, and <recommendations> tags. Ensure that each section is thoroughly and comprehensively filled out.
"""
}
system_prompt = system_prompts.get(language, system_prompts['English']) # Domyślnie angielski, jeśli język nie jest obsługiwany
user_prompt = f"""Analyze the following message for potential fraud:
Message: "{message}"
Sender's Phone Number: "{phone_number}"
Additional Information:
{additional_info}
Provide your analysis and conclusions following the guidelines above."""
payload = {
"model": "Meta-Llama-3.1-8B-Instruct", # Upewnij się, że to poprawny model API
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_tokens": 1000,
"temperature": 0.2,
"top_p": 0.9,
"stop": ["<|eot_id|>"]
}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
ai_response = data['choices'][0]['message']['content']
analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL)
risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL)
recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL)
analysis_text = analysis.group(1).strip() if analysis else "Brak analizy."
risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka."
recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń."
return analysis_text, risk_text, recommendations_text
else:
logging.error(f"Błąd API: {response.status_code} - {response.text}")
return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy."
except Exception as e:
logging.error(f"Błąd połączenia z API: {e}")
return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy."
def analyze_url(url):
"""Analizuje zawartość strony internetowej pod kątem oszustw."""
phishing_urls = []
# Sprawdzanie URL w PhishTank
def check_url_phishtank(url):
params = {
'format': 'json',
'url': url
}
try:
response = requests.post('https://checkurl.phishtank.com/checkurl/', data=params)
if response.status_code == 200:
data = response.json()
in_database = data.get('results', {}).get('in_database', False)
valid = data.get('results', {}).get('valid', False)
if in_database and valid:
return True
else:
logging.warning(f"Błąd podczas sprawdzania URL w PhishTank: {response.status_code}")
except Exception as e:
logging.error(f"Błąd podczas sprawdzania URL w PhishTank: {e}")
return False
# Sprawdzanie URL w Google Safe Browsing
def check_url_safe_browsing(url):
api_key = os.getenv('GOOGLE_SAFE_BROWSING_API_KEY')
if not api_key:
return None
unsafe_urls = []
headers = {'Content-Type': 'application/json'}
client_body = {
'client': {
'clientId': 'yourcompanyname',
'clientVersion': '1.0'
},
'threatInfo': {
'threatTypes': ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"],
'platformTypes': ["ANY_PLATFORM"],
'threatEntryTypes': ["URL"],
'threatEntries': [{'url': url}]
}
}
try:
response = requests.post(
f'https://safebrowsing.googleapis.com/v4/threatMatches:find?key={api_key}',
headers=headers,
json=client_body
)
if response.status_code == 200:
data = response.json()
matches = data.get('matches', [])
return [match['threat']['url'] for match in matches]
else:
logging.error(f"Błąd podczas komunikacji z Google Safe Browsing API: {response.status_code}")
except Exception as e:
logging.error(f"Błąd podczas sprawdzania URL w Google Safe Browsing: {e}")
return []
# Sprawdzanie URL w PhishTank
if check_url_phishtank(url):
phishing_urls.append(url)
# Sprawdzanie URL w Google Safe Browsing
unsafe_urls = check_url_safe_browsing(url)
if unsafe_urls:
phishing_urls.extend(unsafe_urls)
return phishing_urls # Zwraca listę zagrożonych URL
def extract_text_from_image(image_file):
"""Ekstrahuje tekst z przesłanego obrazu przy użyciu pytesseract."""
try:
image = Image.open(image_file)
text = pytesseract.image_to_string(image)
return text
except Exception as e:
logging.error(f"Błąd podczas ekstrakcji tekstu z obrazu: {e}")
return "Błąd podczas ekstrakcji tekstu."
def get_email_info(email):
"""Sprawdza informacje o nadawcy e-maila (np. domena, organizacja, kraj)."""
domain = email.split('@')[-1] # Prosta ekstrakcja domeny
# Możesz dodać więcej logiki do weryfikacji domeny
return {
"domain": domain,
"organization": "Nieznana organizacja", # Możesz dodać logikę, aby zidentyfikować organizację
"country": "Nieznany kraj" # Możesz dodać logikę, aby zidentyfikować kraj
}
def get_analysis_history():
"""Zwraca historię analiz z pliku history.json."""
return get_history() # Używamy get_history do uzyskania historii analiz