Spaces:
Running
Running
import gradio as gr | |
from prodiapy import Prodia | |
from PIL import Image | |
from io import BytesIO | |
import requests | |
import random | |
import os | |
import base64 | |
import json | |
import time | |
class Prodia: | |
def __init__(self, api_key=os.getenv("PRODIA_API_KEY"), base=None): | |
self.base = base or "https://api.prodia.com/v1" | |
self.headers = { | |
"X-Prodia-Key": api_key | |
} | |
def faceswap(self, params): | |
response = self._post(f"{self.base}/faceswap", params) | |
return response.json() | |
def get_job(self, job_id): | |
response = self._get(f"{self.base}/job/{job_id}") | |
return response.json() | |
def wait(self, job): | |
job_result = job | |
while job_result['status'] not in ['succeeded', 'failed']: | |
time.sleep(0.25) | |
job_result = self.get_job(job['job']) | |
return job_result | |
def _post(self, url, params): | |
headers = { | |
**self.headers, | |
"Content-Type": "application/json" | |
} | |
response = requests.post(url, headers=headers, data=json.dumps(params)) | |
if response.status_code != 200: | |
raise Exception(f"Bad Prodia Response: {response.status_code}") | |
return response | |
def _get(self, url): | |
response = requests.get(url, headers=self.headers) | |
if response.status_code != 200: | |
raise Exception(f"Bad Prodia Response: {response.status_code}") | |
return response | |
client = Prodia() | |
def infer(source, target): | |
if source_image is None or target_image is None: | |
return | |
source_url = upload_image(source) | |
target_url = upload_image(target) | |
job = client.faceswap({ | |
"sourceUrl": source_url, | |
"targetUrl": target_url | |
}) | |
res = client.wait(job) | |
if res['status'] == "failed": | |
return | |
return res['imageUrl'] | |
def upload_image(file): | |
files = {'file': open(file, 'rb')} | |
img_id = requests.post(os.getenv("IMAGE_API_1"), files=files).json()['id'] | |
payload = { | |
"content": "", | |
"nonce": f"{random.randint(1, 10000000)}H9X42KSEJFNNH", | |
"replies": [], | |
"attachments": [img_id] | |
} | |
res = requests.post(os.getenv("IMAGE_API_2"), json=payload, headers={"x-session-token": os.getenv("SESSION_TOKEN")}) | |
return f"{os.getenv('IMAGE_API_1')}/{img_id}/{res.json()['attachments'][0]['filename']}" | |
def image_to_base64(image: Image): | |
# Convert the image to bytes | |
buffered = BytesIO() | |
image.save(buffered, format="PNG") # You can change format to PNG if needed | |
# Encode the bytes to base64 | |
img_str = base64.b64encode(buffered.getvalue()) | |
return img_str.decode('utf-8') # Convert bytes to string | |
with gr.Blocks() as demo: | |
with gr.Column(): | |
gr.HTML("<h1><center>Face Swap</center></h1>") | |
with gr.Row(): | |
with gr.Row(): | |
source_image = gr.Image(type="filepath", label="Source Image") | |
target_image = gr.Image(type="filepath", label="Target Image") | |
with gr.Column(): | |
result = gr.Image() | |
run_button = gr.Button("Swap Face", variant="primary") | |
gr.Examples( | |
examples=[ | |
["example1.jpg", "example2.jpg"], | |
["example3.jpg", "example4.jpg"], | |
["example5.jpg", "example6.jpg"] | |
], | |
fn=infer, | |
inputs=[source_image, target_image], | |
outputs=[result] | |
) | |
run_button.click(fn=infer, inputs=[source_image, target_image], outputs=[result]) | |
demo.queue(max_size=20, api_open=False).launch(show_api=False, max_threads=400) | |