Spaces:
Running
Running
File size: 5,626 Bytes
3937dd0 5fc53b0 3937dd0 5fc53b0 d0133b3 8dd805b 3937dd0 8dd805b 3937dd0 5fc53b0 3937dd0 5fc53b0 3937dd0 5fc53b0 3937dd0 db40b9d 3937dd0 6331924 3937dd0 8bc27b6 3937dd0 8bc27b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import datetime
import gradio as gr
import requests
import random
import io
import zipfile
from PIL import Image
import os
import numpy as np
import json
import boto3
# Create an S3 client
s3 = boto3.client('s3')
def save_to_s3(image_data, payload, file_name):
# Define the bucket and the path
bucket_name = 'dataset-novelai'
folder_name = datetime.datetime.now().strftime("%Y-%m-%d")
image_key = f'gradio/{folder_name}/{file_name}.webp'
payload_key = f'gradio/{folder_name}/{file_name}.json'
# Save the image
image_data.seek(0) # Go to the start of the BytesIO object
s3.upload_fileobj(image_data, bucket_name, image_key, ExtraArgs={'ContentType': 'image/webp'})
# Save the payload
payload_data = io.BytesIO(payload.encode('utf-8'))
s3.upload_fileobj(payload_data, bucket_name, payload_key, ExtraArgs={'ContentType': 'application/json'})
import concurrent
# Function to handle the NovelAI API request
def generate_novelai_image(input_text, quality_tags, seed, negative_prompt, scale, ratio, sampler, progress=gr.Progress()):
jwt_token = os.environ.get('NAI_API_KEY')
if ratio == "Landscape (1216x832)":
width = 1216
height = 832
elif ratio == "Square (1024x1024)":
width = 1024
height = 1024
elif ratio == "Portrait (832x1216)":
width = 832
height = 1216
# Check if quality tags are provided and append to input
final_input = input_text
if quality_tags:
final_input += ", " + quality_tags
# Assign a random seed if seed is -1
if seed == -1:
seed = random.randint(0, 2**32 - 1)
# Define the API URL
url = "https://api.novelai.net/ai/generate-image"
# Set the headers
headers = {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json",
"Origin": "https://novelai.net",
"Referer": "https://novelai.net/"
}
# Define the payload
payload = {
"action": "generate",
"input": final_input,
"model": "nai-diffusion-3",
"parameters": {
"width": width,
"height": height,
"scale": scale,
"sampler": sampler,
"steps": 28,
"n_samples": 1,
"ucPreset": 0,
"add_original_image": False,
"cfg_rescale": 0,
"controlnet_strength": 1,
"dynamic_thresholding": False,
"legacy": False,
"negative_prompt": negative_prompt,
"noise_schedule": "native",
"qualityToggle": True,
"seed": seed,
"sm": False,
"sm_dyn": False,
"ucPreset": 0,
"uncond_scale": 1,
}
}
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(request_novelai, url, payload, headers)
futures.append(future)
for _ in progress.tqdm([None] * 15, desc="Generating"):
done, not_done = concurrent.futures.wait(futures, timeout=np.random.uniform(0.8, 1.4))
if len(not_done) == 0:
break
return futures[0].result()
def request_novelai(url, payload, headers):
# Send the POST request
response = requests.post(url, json=payload, headers=headers)
# Process the response
if response.headers.get('Content-Type') == 'application/x-zip-compressed':
zipfile_in_memory = io.BytesIO(response.content)
with zipfile.ZipFile(zipfile_in_memory, 'r') as zip_ref:
file_names = zip_ref.namelist()
if file_names:
with zip_ref.open(file_names[0]) as file:
image = Image.open(file)
# Prepare to save the image to S3
buffered = io.BytesIO()
image.save(buffered, format="WEBP", quality=98)
file_name = str(int(datetime.datetime.now().timestamp()))
# save_to_s3(buffered, json.dumps(payload, indent=4), file_name)
return np.array(image), json.dumps(payload, indent=4)
else:
return "No images found in the zip file.", json.dumps(payload, indent=4)
else:
return "The response is not a zip file.", json.dumps(payload, indent=4)
# Create Gradio interface
iface = gr.Interface(
fn=generate_novelai_image,
inputs=[
gr.Textbox(label="Input Text"),
gr.Textbox(label="Quality Tags", value="best quality, amazing quality, very aesthetic, absurdres"),
gr.Slider(minimum=-1, maximum=2**32 - 1, step=1, value=-1, label="Seed"),
gr.Textbox(label="Negative Prompt", value="nsfw, lowres, {bad}, error, fewer, extra, missing, worst quality, jpeg artifacts, bad quality, watermark, unfinished, displeasing, chromatic aberration, signature, extra digits, artistic error, username, scan, [abstract]"),
gr.Slider(minimum=1, maximum=20, step=1, value=5, label="Scale"),
gr.Radio(choices=["Landscape (1216x832)", "Square (1024x1024)", "Portrait (832x1216)"], value="Portrait (832x1216)"),
gr.Dropdown(
choices=[
"k_euler", "k_euler_ancestral", "k_dpmpp_2s_ancestral",
"k_dpmpp_2m", "k_dpmpp_sde", "ddim_v3"
],
value="k_euler",
label="Sampler"
)
],
outputs=[
"image",
gr.Textbox(label="Submitted Payload")
],
concurrency_limit=5,
)
try:
iface.launch(share=True)
except RuntimeError: # use in HF spaces
iface.launch() |