Spaces:
Sleeping
Sleeping
# utils/functions.py | |
import phonenumbers | |
from phonenumbers import geocoder, carrier | |
import re | |
import requests | |
import os | |
import json | |
from datetime import datetime | |
import logging | |
# Konfiguracja logowania | |
logging.basicConfig(filename='app.log', level=logging.ERROR, format='%(asctime)s %(levelname)s:%(message)s') | |
# Ścieżka do pliku JSON przechowującego fałszywe numery | |
FAKE_NUMBERS_FILE = 'fake_numbers.json' | |
# Inicjalizacja pliku JSON przechowującego fałszywe numery | |
def init_fake_numbers_file(): | |
if not os.path.exists(FAKE_NUMBERS_FILE): | |
with open(FAKE_NUMBERS_FILE, 'w') as f: | |
json.dump([], f) | |
else: | |
# Sprawdzenie, czy plik nie jest pusty i zawiera prawidłowy JSON | |
try: | |
with open(FAKE_NUMBERS_FILE, 'r') as f: | |
json.load(f) | |
except json.JSONDecodeError: | |
# Jeśli plik jest uszkodzony lub pusty, zresetuj go do pustej listy | |
with open(FAKE_NUMBERS_FILE, 'w') as f: | |
json.dump([], f) | |
# Dodanie numeru telefonu do pliku JSON | |
def add_fake_number(phone_number): | |
try: | |
with open(FAKE_NUMBERS_FILE, 'r') as f: | |
fake_numbers = json.load(f) | |
except (json.JSONDecodeError, FileNotFoundError): | |
fake_numbers = [] | |
if not any(entry["phone_number"] == phone_number for entry in fake_numbers): | |
fake_numbers.append({ | |
"phone_number": phone_number, | |
"reported_at": datetime.now().isoformat() | |
}) | |
try: | |
with open(FAKE_NUMBERS_FILE, 'w') as f: | |
json.dump(fake_numbers, f, indent=4) | |
return True | |
except Exception as e: | |
logging.error(f"Nie udało się zapisać numeru {phone_number}: {e}") | |
return False | |
else: | |
return False # Numer już istnieje | |
# Sprawdzenie, czy numer telefonu jest w pliku JSON | |
def is_fake_number(phone_number): | |
try: | |
with open(FAKE_NUMBERS_FILE, 'r') as f: | |
fake_numbers = json.load(f) | |
return any(entry["phone_number"] == phone_number for entry in fake_numbers) | |
except (json.JSONDecodeError, FileNotFoundError): | |
return False | |
# Pobierz klucz API z zmiennej środowiskowej | |
API_KEY = os.getenv('SAMBANOVA_API_KEY') | |
# Funkcja do weryfikacji numeru telefonu | |
def get_phone_info(phone_number): | |
try: | |
parsed_number = phonenumbers.parse(phone_number, None) | |
country = geocoder.description_for_number(parsed_number, 'pl') | |
operator = carrier.name_for_number(parsed_number, 'pl') | |
return country, operator | |
except phonenumbers.NumberParseException: | |
return None, None | |
# Proste sprawdzenia heurystyczne wiadomości | |
def simple_checks(message): | |
warnings = [] | |
# Słowa kluczowe często używane w oszustwach | |
scam_keywords = ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata'] | |
if any(keyword in message.lower() for keyword in scam_keywords): | |
warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.") | |
# Sprawdzenie obecności linków | |
if re.search(r'http[s]?://', message): | |
warnings.append("Wiadomość zawiera link.") | |
# Sprawdzenie, czy nadawca prosi o poufne informacje | |
if re.search(r'\b(podaj|prześlij|udostępnij)\b.*\b(hasło|kod|dane osobowe|numer konta)\b', message.lower()): | |
warnings.append("Wiadomość zawiera prośbę o poufne informacje.") | |
return warnings | |
# Funkcja do analizy wiadomości za pomocą API SambaNova z głębszym procesem myślenia | |
def analyze_message(message, phone_number, additional_info, api_key): | |
if not api_key: | |
logging.error("Brak klucza API.") | |
return "Brak klucza API.", "Brak klucza API.", "Brak klucza API." | |
url = "https://api.sambanova.ai/v1/chat/completions" | |
headers = { | |
"Authorization": f"Bearer {api_key}" | |
} | |
# Rozbudowany system prompt z głębszym procesem myślenia | |
system_prompt = """ | |
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje: | |
<analysis> | |
**Analiza Treści Wiadomości:** | |
- Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp. | |
- Opisz kontekst językowy i kulturowy wiadomości. | |
- Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy. | |
</analysis> | |
<risk_assessment> | |
**Ocena Ryzyka Oszustwa:** | |
- Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko. | |
- Wyjaśnij, jakie czynniki wpływają na tę ocenę. | |
</risk_assessment> | |
<recommendations> | |
**Zalecenia dla Użytkownika:** | |
- Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć. | |
- Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości. | |
- Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe. | |
</recommendations> | |
Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo. | |
""" | |
user_prompt = f"""Przeanalizuj następującą wiadomość pod kątem potencjalnego oszustwa: | |
Wiadomość: "{message}" | |
Numer telefonu nadawcy: "{phone_number}" | |
Dodatkowe informacje: | |
{additional_info} | |
Podaj swoją analizę i wnioski zgodnie z powyższymi wytycznymi.""" | |
payload = { | |
"model": "Meta-Llama-3.1-8B-Instruct", | |
"messages": [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt} | |
], | |
"max_tokens": 1000, | |
"temperature": 0.2, | |
"top_p": 0.9, | |
"stop": ["<|eot_id|>"] | |
} | |
try: | |
response = requests.post(url, headers=headers, json=payload) | |
if response.status_code == 200: | |
data = response.json() | |
ai_response = data['choices'][0]['message']['content'] | |
# Parsowanie odpowiedzi | |
analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL) | |
risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL) | |
recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL) | |
analysis_text = analysis.group(1).strip() if analysis else "Brak analizy." | |
risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka." | |
recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń." | |
return analysis_text, risk_text, recommendations_text | |
else: | |
logging.error(f"Błąd API: {response.status_code} - {response.text}") | |
return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy." | |
except Exception as e: | |
logging.error(f"Błąd połączenia z API: {e}") | |
return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy." | |
# Inicjalizacja pliku statystyk | |
def init_stats_file(): | |
stats_file = 'stats.json' | |
if not os.path.exists(stats_file): | |
with open(stats_file, 'w') as f: | |
json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f) | |
# Pobranie statystyk | |
def get_stats(): | |
stats_file = 'stats.json' | |
try: | |
with open(stats_file, 'r') as f: | |
stats = json.load(f) | |
return stats | |
except (json.JSONDecodeError, FileNotFoundError): | |
return {"total_analyses": 0, "total_frauds_detected": 0} | |
# Aktualizacja statystyk analizy | |
def update_stats(fraud_detected=False): | |
stats_file = 'stats.json' | |
try: | |
with open(stats_file, 'r') as f: | |
stats = json.load(f) | |
except (json.JSONDecodeError, FileNotFoundError): | |
stats = {"total_analyses": 0, "total_frauds_detected": 0} | |
stats["total_analyses"] += 1 | |
if fraud_detected: | |
stats["total_frauds_detected"] += 1 | |
with open(stats_file, 'w') as f: | |
json.dump(stats, f, indent=4) | |
# Inicjalizacja pliku historii analiz | |
def init_history_file(): | |
history_file = 'history.json' | |
if not os.path.exists(history_file): | |
with open(history_file, 'w') as f: | |
json.dump([], f) | |
# Dodanie wpisu do historii analiz | |
def add_to_history(message, phone_number, analysis, risk, recommendations): | |
history_file = 'history.json' | |
try: | |
with open(history_file, 'r') as f: | |
history = json.load(f) | |
except (json.JSONDecodeError, FileNotFoundError): | |
history = [] | |
history.append({ | |
"timestamp": datetime.now().isoformat(), | |
"message": message, | |
"phone_number": phone_number, | |
"analysis": analysis, | |
"risk_assessment": risk, | |
"recommendations": recommendations | |
}) | |
with open(history_file, 'w') as f: | |
json.dump(history, f, indent=4) | |
# Pobranie historii analiz | |
def get_history(): | |
history_file = 'history.json' | |
try: | |
with open(history_file, 'r') as f: | |
history = json.load(f) | |
return history | |
except (json.JSONDecodeError, FileNotFoundError): | |
return [] | |