# utils/functions.py import phonenumbers from phonenumbers import geocoder, carrier import re import requests import os import json from datetime import datetime import logging # Konfiguracja logowania logging.basicConfig(filename='app.log', level=logging.ERROR, format='%(asctime)s %(levelname)s:%(message)s') # Ścieżka do pliku JSON przechowującego fałszywe numery FAKE_NUMBERS_FILE = 'fake_numbers.json' # Inicjalizacja pliku JSON przechowującego fałszywe numery def init_fake_numbers_file(): if not os.path.exists(FAKE_NUMBERS_FILE): with open(FAKE_NUMBERS_FILE, 'w') as f: json.dump([], f) else: # Sprawdzenie, czy plik nie jest pusty i zawiera prawidłowy JSON try: with open(FAKE_NUMBERS_FILE, 'r') as f: json.load(f) except json.JSONDecodeError: # Jeśli plik jest uszkodzony lub pusty, zresetuj go do pustej listy with open(FAKE_NUMBERS_FILE, 'w') as f: json.dump([], f) # Dodanie numeru telefonu do pliku JSON def add_fake_number(phone_number): try: with open(FAKE_NUMBERS_FILE, 'r') as f: fake_numbers = json.load(f) except (json.JSONDecodeError, FileNotFoundError): fake_numbers = [] if not any(entry["phone_number"] == phone_number for entry in fake_numbers): fake_numbers.append({ "phone_number": phone_number, "reported_at": datetime.now().isoformat() }) try: with open(FAKE_NUMBERS_FILE, 'w') as f: json.dump(fake_numbers, f, indent=4) return True except Exception as e: logging.error(f"Nie udało się zapisać numeru {phone_number}: {e}") return False else: return False # Numer już istnieje # Sprawdzenie, czy numer telefonu jest w pliku JSON def is_fake_number(phone_number): try: with open(FAKE_NUMBERS_FILE, 'r') as f: fake_numbers = json.load(f) return any(entry["phone_number"] == phone_number for entry in fake_numbers) except (json.JSONDecodeError, FileNotFoundError): return False # Pobierz klucz API z zmiennej środowiskowej API_KEY = os.getenv('SAMBANOVA_API_KEY') # Funkcja do weryfikacji numeru telefonu def get_phone_info(phone_number): try: parsed_number = phonenumbers.parse(phone_number, None) country = geocoder.description_for_number(parsed_number, 'pl') operator = carrier.name_for_number(parsed_number, 'pl') return country, operator except phonenumbers.NumberParseException: return None, None # Proste sprawdzenia heurystyczne wiadomości def simple_checks(message): warnings = [] # Słowa kluczowe często używane w oszustwach scam_keywords = ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata'] if any(keyword in message.lower() for keyword in scam_keywords): warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.") # Sprawdzenie obecności linków if re.search(r'http[s]?://', message): warnings.append("Wiadomość zawiera link.") # Sprawdzenie, czy nadawca prosi o poufne informacje if re.search(r'\b(podaj|prześlij|udostępnij)\b.*\b(hasło|kod|dane osobowe|numer konta)\b', message.lower()): warnings.append("Wiadomość zawiera prośbę o poufne informacje.") return warnings # Funkcja do analizy wiadomości za pomocą API SambaNova z głębszym procesem myślenia def analyze_message(message, phone_number, additional_info, api_key): if not api_key: logging.error("Brak klucza API.") return "Brak klucza API.", "Brak klucza API.", "Brak klucza API." url = "https://api.sambanova.ai/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}" } # Rozbudowany system prompt z głębszym procesem myślenia system_prompt = """ Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje: **Analiza Treści Wiadomości:** - Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp. - Opisz kontekst językowy i kulturowy wiadomości. - Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy. **Ocena Ryzyka Oszustwa:** - Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko. - Wyjaśnij, jakie czynniki wpływają na tę ocenę. **Zalecenia dla Użytkownika:** - Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć. - Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości. - Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe. Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników , i . Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo. """ user_prompt = f"""Przeanalizuj następującą wiadomość pod kątem potencjalnego oszustwa: Wiadomość: "{message}" Numer telefonu nadawcy: "{phone_number}" Dodatkowe informacje: {additional_info} Podaj swoją analizę i wnioski zgodnie z powyższymi wytycznymi.""" payload = { "model": "Meta-Llama-3.1-8B-Instruct", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "max_tokens": 1000, "temperature": 0.2, "top_p": 0.9, "stop": ["<|eot_id|>"] } try: response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: data = response.json() ai_response = data['choices'][0]['message']['content'] # Parsowanie odpowiedzi analysis = re.search(r'(.*?)', ai_response, re.DOTALL) risk_assessment = re.search(r'(.*?)', ai_response, re.DOTALL) recommendations = re.search(r'(.*?)', ai_response, re.DOTALL) analysis_text = analysis.group(1).strip() if analysis else "Brak analizy." risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka." recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń." return analysis_text, risk_text, recommendations_text else: logging.error(f"Błąd API: {response.status_code} - {response.text}") return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy." except Exception as e: logging.error(f"Błąd połączenia z API: {e}") return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy." # Inicjalizacja pliku statystyk def init_stats_file(): stats_file = 'stats.json' if not os.path.exists(stats_file): with open(stats_file, 'w') as f: json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f) # Pobranie statystyk def get_stats(): stats_file = 'stats.json' try: with open(stats_file, 'r') as f: stats = json.load(f) return stats except (json.JSONDecodeError, FileNotFoundError): return {"total_analyses": 0, "total_frauds_detected": 0} # Aktualizacja statystyk analizy def update_stats(fraud_detected=False): stats_file = 'stats.json' try: with open(stats_file, 'r') as f: stats = json.load(f) except (json.JSONDecodeError, FileNotFoundError): stats = {"total_analyses": 0, "total_frauds_detected": 0} stats["total_analyses"] += 1 if fraud_detected: stats["total_frauds_detected"] += 1 with open(stats_file, 'w') as f: json.dump(stats, f, indent=4) # Inicjalizacja pliku historii analiz def init_history_file(): history_file = 'history.json' if not os.path.exists(history_file): with open(history_file, 'w') as f: json.dump([], f) # Dodanie wpisu do historii analiz def add_to_history(message, phone_number, analysis, risk, recommendations): history_file = 'history.json' try: with open(history_file, 'r') as f: history = json.load(f) except (json.JSONDecodeError, FileNotFoundError): history = [] history.append({ "timestamp": datetime.now().isoformat(), "message": message, "phone_number": phone_number, "analysis": analysis, "risk_assessment": risk, "recommendations": recommendations }) with open(history_file, 'w') as f: json.dump(history, f, indent=4) # Pobranie historii analiz def get_history(): history_file = 'history.json' try: with open(history_file, 'r') as f: history = json.load(f) return history except (json.JSONDecodeError, FileNotFoundError): return []