File size: 6,223 Bytes
3b2c8d9
 
63f76ea
 
 
 
 
3b2c8d9
 
63f76ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b2c8d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# utils/functions.py

import phonenumbers
from phonenumbers import geocoder, carrier
import re
import requests
import os
import json
from datetime import datetime

def get_phone_info(phone_number):
    """
    Weryfikuje numer telefonu i zwraca kraj oraz operatora.
    """
    try:
        parsed_number = phonenumbers.parse(phone_number, None)
        country = geocoder.description_for_number(parsed_number, 'pl')
        operator = carrier.name_for_number(parsed_number, 'pl')
        return country, operator
    except phonenumbers.NumberParseException:
        return None, None

def simple_checks(message):
    """
    Wykonuje podstawowe sprawdzenia heurystyczne wiadomości SMS.
    """
    warnings = []
    scam_keywords = ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata']
    if any(keyword in message.lower() for keyword in scam_keywords):
        warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.")
    if re.search(r'http[s]?://', message):
        warnings.append("Wiadomość zawiera link.")
    if re.search(r'\b(podaj|prześlij|udostępnij)\b.*\b(hasło|kod|dane osobowe|numer konta)\b', message.lower()):
        warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
    return warnings

def analyze_message(message, phone_number, additional_info, api_key):
    """
    Analizuje wiadomość SMS za pomocą API SambaNova.
    """
    url = "https://api.sambanova.ai/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}"
    }
    system_prompt = """
Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje:

<analysis>
**Analiza Treści Wiadomości:**
- Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp.
- Opisz kontekst językowy i kulturowy wiadomości.
- Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy.
</analysis>

<risk_assessment>
**Ocena Ryzyka Oszustwa:**
- Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko.
- Wyjaśnij, jakie czynniki wpływają na tę ocenę.
</risk_assessment>

<recommendations>
**Zalecenia dla Użytkownika:**
- Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć.
- Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości.
- Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe.
</recommendations>

Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników <analysis>, <risk_assessment> i <recommendations>. Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo.
    """

    user_prompt = f"""Przeanalizuj następującą wiadomość pod kątem potencjalnego oszustwa:

Wiadomość: "{message}"
Numer telefonu nadawcy: {phone_number}"

Dodatkowe informacje:
{additional_info}

Podaj swoją analizę i wnioski zgodnie z powyższymi wytycznymi."""

    payload = {
        "model": "Meta-Llama-3.1-8B-Instruct",
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        "max_tokens": 1000,
        "temperature": 0.2,
        "top_p": 0.9,
        "stop": ["<|eot_id|>"]
    }

    try:
        response = requests.post(url, headers=headers, json=payload)
        if response.status_code == 200:
            data = response.json()
            ai_response = data['choices'][0]['message']['content']
            analysis = re.search(r'<analysis>(.*?)</analysis>', ai_response, re.DOTALL)
            risk_assessment = re.search(r'<risk_assessment>(.*?)</risk_assessment>', ai_response, re.DOTALL)
            recommendations = re.search(r'<recommendations>(.*?)</recommendations>', ai_response, re.DOTALL)

            analysis_text = analysis.group(1).strip() if analysis else "Brak analizy."
            risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka."
            recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń."

            return analysis_text, risk_text, recommendations_text
        else:
            return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy."
    except Exception as e:
        return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy."

def init_stats_file():
    """
    Inicjalizuje plik statystyk, jeśli nie istnieje.
    """
    stats_file = 'stats.json'
    if not os.path.exists(stats_file):
        with open(stats_file, 'w') as f:
            json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f)

def update_stats(fraud_detected=False):
    """
    Aktualizuje statystyki analizy.
    """
    stats_file = 'stats.json'
    try:
        with open(stats_file, 'r') as f:
            stats = json.load(f)
    except (json.JSONDecodeError, FileNotFoundError):
        stats = {"total_analyses": 0, "total_frauds_detected": 0}

    stats["total_analyses"] += 1
    if fraud_detected:
        stats["total_frauds_detected"] += 1

    with open(stats_file, 'w') as f:
        json.dump(stats, f, indent=4)

def get_stats():
    """
    Pobiera aktualne statystyki analizy.
    """
    stats_file = 'stats.json'
    try:
        with open(stats_file, 'r') as f:
            stats = json.load(f)
        return stats
    except (json.JSONDecodeError, FileNotFoundError):
        return {"total_analyses": 0, "total_frauds_detected": 0}
    ```