File size: 4,859 Bytes
205b8f5
1f6a041
e496964
1f6a041
51a3166
205b8f5
1f6a041
 
 
 
 
 
 
 
 
3aff737
28ae202
205b8f5
 
51a3166
9be7c33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
705287f
9be7c33
 
 
 
 
 
 
 
 
 
 
 
 
3aff737
 
 
 
 
 
1f6a041
 
 
 
 
 
9bb30ce
 
1f6a041
205b8f5
 
 
 
 
 
 
 
 
 
3aff737
42b1f81
1f6a041
42b1f81
decc5b1
1f6a041
42b1f81
1f6a041
438e3e2
205b8f5
1f6a041
42b1f81
 
 
 
9bb30ce
 
 
 
 
 
 
 
 
 
 
 
 
1f6a041
205b8f5
1f6a041
 
438e3e2
 
 
 
9be7c33
 
438e3e2
 
1f6a041
9bb30ce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding:UTF-8 -*-
# !/usr/bin/env python
import spaces
import numpy as np
import gradio as gr
import gradio.exceptions
import roop.globals
from roop.core import (
    start,
    decode_execution_providers,
)
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import normalize_output_path
import os
from PIL import Image
import uuid
import onnxruntime as ort
import cv2
from roop.face_analyser import get_one_face

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import utils
import base64
import json
import datetime

def load_public_key_from_file(file_path):
    with open(file_path, "rb") as key_file:
        public_key = serialization.load_pem_public_key(
            key_file.read(),
            backend=default_backend()
        )
    return public_key

def verify_signature(public_key, data, signature):
    """
    Verify a signature with a public key.
    Converts the data to bytes if it's not already in byte format.
    """
    # Ensure the data is in bytes. If it's a string, encode it to UTF-8.
    if isinstance(data, str):
        data = data.encode('utf-8')

    try:
        # Verify the signature
        public_key.verify(
            signature,
            data,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return True
    except Exception as e:
        print("Verification failed:", e)
        return False

public_key = load_public_key_from_file("./nsfwais.pubkey.pem")

@spaces.GPU
def swap_face(source_file, target_file, doFaceEnhancer, skey):
    
    skey = json.loads(skey)
    #first validate skey
    signature = base64.b64decode(skey["s"])
    if not verify_signature(public_key, skey["t"], signature):
        raise Exception("verify authkey failed.")
    timestamp_requested = int(skey["t"])
    timestamp_now = int(datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp())
    if timestamp_now - timestamp_requested > 600:
        raise Exception(f"authkey timeout, {timestamp_now - timestamp_requested}")
    print(f"authkey pass, {timestamp_now - timestamp_requested}")
    
    session_id = str(uuid.uuid4())  # Tạo một UUID duy nhất cho mỗi phiên làm việc
    session_dir = f"temp/{session_id}"
    os.makedirs(session_dir, exist_ok=True)

    source_path = os.path.join(session_dir, "input.jpg")
    target_path = os.path.join(session_dir, "target.jpg")

    source_image = Image.fromarray(source_file)
    source_image.save(source_path)
    target_image = Image.fromarray(target_file)
    target_image.save(target_path)

    print("source_path: ", source_path)
    print("target_path: ", target_path)

    # Check if a face is detected in the source image
    source_face = get_one_face(cv2.imread(source_path))
    if source_face is None:
        raise gradio.exceptions.Error("No face in source path detected.")

    # Check if a face is detected in the target image
    target_face = get_one_face(cv2.imread(target_path))
    if target_face is None:
        raise gradio.exceptions.Error("No face in target path detected.")

    output_path = os.path.join(session_dir, "output.jpg")
    normalized_output_path = normalize_output_path(source_path, target_path, output_path)

    frame_processors = ["face_swapper", "face_enhancer"] if doFaceEnhancer else ["face_swapper"]


    for frame_processor in get_frame_processors_modules(frame_processors):
        if not frame_processor.pre_check():
            print(f"Pre-check failed for {frame_processor}")
            raise gradio.exceptions.Error(f"Pre-check failed for {frame_processor}")

    roop.globals.source_path = source_path
    roop.globals.target_path = target_path
    roop.globals.output_path = normalized_output_path
    roop.globals.frame_processors = frame_processors
    roop.globals.headless = True
    roop.globals.keep_fps = True
    roop.globals.keep_audio = True
    roop.globals.keep_frames = False
    roop.globals.many_faces = False
    roop.globals.video_encoder = "libx264"
    roop.globals.video_quality = 18
    roop.globals.execution_providers = ["CUDAExecutionProvider"]
    roop.globals.reference_face_position = 0
    roop.globals.similar_face_distance = 0.6
    roop.globals.max_memory = 60
    roop.globals.execution_threads = 50
    
    start()
    return normalized_output_path

app = gr.Interface(
    fn=swap_face, 
    inputs=[
        gr.Image(), 
        gr.Image(), 
        gr.Checkbox(label="Face Enhancer?", info="Do face enhancement?"), 
        gr.Textbox(visible=False)
    ], 
    outputs="image"
)
app.launch()