import io
import base64
import os
import sys
import numpy as np
import torch
from torch import autocast
import diffusers
from diffusers.configuration_utils import FrozenDict
from diffusers import (
StableDiffusionPipeline,
StableDiffusionInpaintPipeline,
StableDiffusionImg2ImgPipeline,
StableDiffusionInpaintPipelineLegacy,
DDIMScheduler,
LMSDiscreteScheduler,
)
from PIL import Image
from PIL import ImageOps
import gradio as gr
import base64
import skimage
import skimage.measure
import yaml
import json
from enum import Enum
try:
abspath = os.path.abspath(__file__)
dirname = os.path.dirname(abspath)
os.chdir(dirname)
except:
pass
from utils import *
assert diffusers.__version__ >= "0.6.0", "Please upgrade diffusers to 0.6.0"
USE_NEW_DIFFUSERS = True
RUN_IN_SPACE = "RUN_IN_HG_SPACE" in os.environ
class ModelChoice(Enum):
INPAINTING = "stablediffusion-inpainting"
INPAINTING_IMG2IMG = "stablediffusion-inpainting+img2img-v1.5"
MODEL_1_5 = "stablediffusion-v1.5"
MODEL_1_4 = "stablediffusion-v1.4"
try:
from sd_grpcserver.pipeline.unified_pipeline import UnifiedPipeline
except:
UnifiedPipeline = StableDiffusionInpaintPipeline
# sys.path.append("./glid_3_xl_stable")
USE_GLID = False
# try:
# from glid3xlmodel import GlidModel
# except:
# USE_GLID = False
try:
cuda_available = torch.cuda.is_available()
except:
cuda_available = False
finally:
if sys.platform == "darwin":
device = "mps" if torch.backends.mps.is_available() else "cpu"
elif cuda_available:
device = "cuda"
else:
device = "cpu"
if device != "cuda":
import contextlib
autocast = contextlib.nullcontext
with open("config.yaml", "r") as yaml_in:
yaml_object = yaml.safe_load(yaml_in)
config_json = json.dumps(yaml_object)
def load_html():
body, canvaspy = "", ""
with open("index.html", encoding="utf8") as f:
body = f.read()
with open("canvas.py", encoding="utf8") as f:
canvaspy = f.read()
body = body.replace("- paths:\n", "")
body = body.replace(" - ./canvas.py\n", "")
body = body.replace("from canvas import InfCanvas", canvaspy)
return body
def test(x):
x = load_html()
return f""""""
DEBUG_MODE = False
try:
SAMPLING_MODE = Image.Resampling.LANCZOS
except Exception as e:
SAMPLING_MODE = Image.LANCZOS
try:
contain_func = ImageOps.contain
except Exception as e:
def contain_func(image, size, method=SAMPLING_MODE):
# from PIL: https://pillow.readthedocs.io/en/stable/reference/ImageOps.html#PIL.ImageOps.contain
im_ratio = image.width / image.height
dest_ratio = size[0] / size[1]
if im_ratio != dest_ratio:
if im_ratio > dest_ratio:
new_height = int(image.height / image.width * size[0])
if new_height != size[1]:
size = (size[0], new_height)
else:
new_width = int(image.width / image.height * size[1])
if new_width != size[0]:
size = (new_width, size[1])
return image.resize(size, resample=method)
import argparse
parser = argparse.ArgumentParser(description="stablediffusion-infinity")
parser.add_argument("--port", type=int, help="listen port", dest="server_port")
parser.add_argument("--host", type=str, help="host", dest="server_name")
parser.add_argument("--share", action="store_true", help="share this app?")
parser.add_argument("--debug", action="store_true", help="debug mode")
parser.add_argument("--fp32", action="store_true", help="using full precision")
parser.add_argument("--encrypt", action="store_true", help="using https?")
parser.add_argument("--ssl_keyfile", type=str, help="path to ssl_keyfile")
parser.add_argument("--ssl_certfile", type=str, help="path to ssl_certfile")
parser.add_argument("--ssl_keyfile_password", type=str, help="ssl_keyfile_password")
parser.add_argument(
"--auth", nargs=2, metavar=("username", "password"), help="use username password"
)
parser.add_argument(
"--remote_model",
type=str,
help="use a model (e.g. dreambooth fined) from huggingface hub",
default="",
)
parser.add_argument(
"--local_model", type=str, help="use a model stored on your PC", default=""
)
if __name__ == "__main__":
args = parser.parse_args()
else:
args = parser.parse_args(["--debug"])
# args = parser.parse_args(["--debug"])
if args.auth is not None:
args.auth = tuple(args.auth)
model = {}
def get_token():
token = ""
if os.path.exists(".token"):
with open(".token", "r") as f:
token = f.read()
token = os.environ.get("hftoken", token)
return token
def save_token(token):
with open(".token", "w") as f:
f.write(token)
def prepare_scheduler(scheduler):
if hasattr(scheduler.config, "steps_offset") and scheduler.config.steps_offset != 1:
new_config = dict(scheduler.config)
new_config["steps_offset"] = 1
scheduler._internal_dict = FrozenDict(new_config)
return scheduler
def my_resize(width, height):
if width >= 512 and height >= 512:
return width, height
if width == height:
return 512, 512
smaller = min(width, height)
larger = max(width, height)
if larger >= 608:
return width, height
factor = 1
if smaller < 290:
factor = 2
elif smaller < 330:
factor = 1.75
elif smaller < 384:
factor = 1.375
elif smaller < 400:
factor = 1.25
elif smaller < 450:
factor = 1.125
return int(factor * width)//8*8, int(factor * height)//8*8
def load_learned_embed_in_clip(
learned_embeds_path, text_encoder, tokenizer, token=None
):
# https://colab.research.google.com/github/huggingface/notebooks/blob/main/diffusers/stable_conceptualizer_inference.ipynb
loaded_learned_embeds = torch.load(learned_embeds_path, map_location="cpu")
# separate token and the embeds
trained_token = list(loaded_learned_embeds.keys())[0]
embeds = loaded_learned_embeds[trained_token]
# cast to dtype of text_encoder
dtype = text_encoder.get_input_embeddings().weight.dtype
embeds.to(dtype)
# add the token in tokenizer
token = token if token is not None else trained_token
num_added_tokens = tokenizer.add_tokens(token)
if num_added_tokens == 0:
raise ValueError(
f"The tokenizer already contains the token {token}. Please pass a different `token` that is not already in the tokenizer."
)
# resize the token embeddings
text_encoder.resize_token_embeddings(len(tokenizer))
# get the id for the token and assign the embeds
token_id = tokenizer.convert_tokens_to_ids(token)
text_encoder.get_input_embeddings().weight.data[token_id] = embeds
scheduler_dict = {"PLMS": None, "DDIM": None, "K-LMS": None}
class StableDiffusionInpaint:
def __init__(
self, token: str = "", model_name: str = "", model_path: str = "", **kwargs,
):
self.token = token
original_checkpoint = False
if model_path and os.path.exists(model_path):
if model_path.endswith(".ckpt"):
original_checkpoint = True
elif model_path.endswith(".json"):
model_name = os.path.dirname(model_path)
else:
model_name = model_path
if original_checkpoint:
print(f"Converting & Loading {model_path}")
from convert_checkpoint import convert_checkpoint
pipe = convert_checkpoint(model_path, inpainting=True)
if device == "cuda" and not args.fp32:
pipe.to(torch.float16)
inpaint = StableDiffusionInpaintPipeline(
vae=pipe.vae,
text_encoder=pipe.text_encoder,
tokenizer=pipe.tokenizer,
unet=pipe.unet,
scheduler=pipe.scheduler,
safety_checker=pipe.safety_checker,
feature_extractor=pipe.feature_extractor,
)
else:
print(f"Loading {model_name}")
if device == "cuda" and not args.fp32:
inpaint = StableDiffusionInpaintPipeline.from_pretrained(
model_name,
revision="fp16",
torch_dtype=torch.float16,
use_auth_token=token,
)
else:
inpaint = StableDiffusionInpaintPipeline.from_pretrained(
model_name, use_auth_token=token,
)
if os.path.exists("./embeddings"):
print("Note that StableDiffusionInpaintPipeline + embeddings is untested")
for item in os.listdir("./embeddings"):
if item.endswith(".bin"):
load_learned_embed_in_clip(
os.path.join("./embeddings", item),
inpaint.text_encoder,
inpaint.tokenizer,
)
inpaint.to(device)
# if device == "mps":
# _ = text2img("", num_inference_steps=1)
scheduler_dict["PLMS"] = inpaint.scheduler
scheduler_dict["DDIM"] = prepare_scheduler(
DDIMScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
clip_sample=False,
set_alpha_to_one=False,
)
)
scheduler_dict["K-LMS"] = prepare_scheduler(
LMSDiscreteScheduler(
beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear"
)
)
self.safety_checker = inpaint.safety_checker
save_token(token)
try:
total_memory = torch.cuda.get_device_properties(0).total_memory // (
1024 ** 3
)
if total_memory <= 5:
inpaint.enable_attention_slicing()
except:
pass
self.inpaint = inpaint
def run(
self,
image_pil,
prompt="",
negative_prompt="",
guidance_scale=7.5,
resize_check=True,
enable_safety=True,
fill_mode="patchmatch",
strength=0.75,
step=50,
enable_img2img=False,
use_seed=False,
seed_val=-1,
generate_num=1,
scheduler="",
scheduler_eta=0.0,
**kwargs,
):
inpaint = self.inpaint
selected_scheduler = scheduler_dict.get(scheduler, scheduler_dict["PLMS"])
for item in [inpaint]:
item.scheduler = selected_scheduler
if enable_safety:
item.safety_checker = self.safety_checker
else:
item.safety_checker = lambda images, **kwargs: (images, False)
width, height = image_pil.size
sel_buffer = np.array(image_pil)
img = sel_buffer[:, :, 0:3]
mask = sel_buffer[:, :, -1]
nmask = 255 - mask
process_width = width
process_height = height
if resize_check:
process_width, process_height = my_resize(width, height)
extra_kwargs = {
"num_inference_steps": step,
"guidance_scale": guidance_scale,
"eta": scheduler_eta,
}
if USE_NEW_DIFFUSERS:
extra_kwargs["negative_prompt"] = negative_prompt
extra_kwargs["num_images_per_prompt"] = generate_num
if use_seed:
generator = torch.Generator(inpaint.device).manual_seed(seed_val)
extra_kwargs["generator"] = generator
if True:
img, mask = functbl[fill_mode](img, mask)
mask = 255 - mask
mask = skimage.measure.block_reduce(mask, (8, 8), np.max)
mask = mask.repeat(8, axis=0).repeat(8, axis=1)
extra_kwargs["strength"] = strength
inpaint_func = inpaint
init_image = Image.fromarray(img)
mask_image = Image.fromarray(mask)
# mask_image=mask_image.filter(ImageFilter.GaussianBlur(radius = 8))
with autocast("cuda"):
images = inpaint_func(
prompt=prompt,
image=init_image.resize(
(process_width, process_height), resample=SAMPLING_MODE
),
mask_image=mask_image.resize((process_width, process_height)),
width=process_width,
height=process_height,
**extra_kwargs,
)["images"]
return images
class StableDiffusion:
def __init__(
self,
token: str = "",
model_name: str = "runwayml/stable-diffusion-v1-5",
model_path: str = None,
inpainting_model: bool = False,
**kwargs,
):
self.token = token
original_checkpoint = False
if model_path and os.path.exists(model_path):
if model_path.endswith(".ckpt"):
original_checkpoint = True
elif model_path.endswith(".json"):
model_name = os.path.dirname(model_path)
else:
model_name = model_path
if original_checkpoint:
print(f"Converting & Loading {model_path}")
from convert_checkpoint import convert_checkpoint
text2img = convert_checkpoint(model_path)
if device == "cuda" and not args.fp32:
text2img.to(torch.float16)
else:
print(f"Loading {model_name}")
if device == "cuda" and not args.fp32:
text2img = StableDiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5",
revision="fp16",
torch_dtype=torch.float16,
use_auth_token=token,
)
else:
text2img = StableDiffusionPipeline.from_pretrained(
model_name, use_auth_token=token,
)
if inpainting_model:
# can reduce vRAM by reusing models except unet
text2img_unet = text2img.unet
del text2img.vae
del text2img.text_encoder
del text2img.tokenizer
del text2img.scheduler
del text2img.safety_checker
del text2img.feature_extractor
import gc
gc.collect()
if device == "cuda" and not args.fp32:
inpaint = StableDiffusionInpaintPipeline.from_pretrained(
"runwayml/stable-diffusion-inpainting",
revision="fp16",
torch_dtype=torch.float16,
use_auth_token=token,
).to(device)
else:
inpaint = StableDiffusionInpaintPipeline.from_pretrained(
"runwayml/stable-diffusion-inpainting", use_auth_token=token,
).to(device)
text2img_unet.to(device)
text2img = StableDiffusionPipeline(
vae=inpaint.vae,
text_encoder=inpaint.text_encoder,
tokenizer=inpaint.tokenizer,
unet=text2img_unet,
scheduler=inpaint.scheduler,
safety_checker=inpaint.safety_checker,
feature_extractor=inpaint.feature_extractor,
)
else:
inpaint = StableDiffusionInpaintPipelineLegacy(
vae=text2img.vae,
text_encoder=text2img.text_encoder,
tokenizer=text2img.tokenizer,
unet=text2img.unet,
scheduler=text2img.scheduler,
safety_checker=text2img.safety_checker,
feature_extractor=text2img.feature_extractor,
).to(device)
text_encoder = text2img.text_encoder
tokenizer = text2img.tokenizer
if os.path.exists("./embeddings"):
for item in os.listdir("./embeddings"):
if item.endswith(".bin"):
load_learned_embed_in_clip(
os.path.join("./embeddings", item),
text2img.text_encoder,
text2img.tokenizer,
)
text2img.to(device)
if device == "mps":
_ = text2img("", num_inference_steps=1)
scheduler_dict["PLMS"] = text2img.scheduler
scheduler_dict["DDIM"] = prepare_scheduler(
DDIMScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
clip_sample=False,
set_alpha_to_one=False,
)
)
scheduler_dict["K-LMS"] = prepare_scheduler(
LMSDiscreteScheduler(
beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear"
)
)
self.safety_checker = text2img.safety_checker
img2img = StableDiffusionImg2ImgPipeline(
vae=text2img.vae,
text_encoder=text2img.text_encoder,
tokenizer=text2img.tokenizer,
unet=text2img.unet,
scheduler=text2img.scheduler,
safety_checker=text2img.safety_checker,
feature_extractor=text2img.feature_extractor,
).to(device)
save_token(token)
try:
total_memory = torch.cuda.get_device_properties(0).total_memory // (
1024 ** 3
)
if total_memory <= 5:
inpaint.enable_attention_slicing()
except:
pass
self.text2img = text2img
self.inpaint = inpaint
self.img2img = img2img
self.unified = UnifiedPipeline(
vae=text2img.vae,
text_encoder=text2img.text_encoder,
tokenizer=text2img.tokenizer,
unet=text2img.unet,
scheduler=text2img.scheduler,
safety_checker=text2img.safety_checker,
feature_extractor=text2img.feature_extractor,
).to(device)
self.inpainting_model = inpainting_model
def run(
self,
image_pil,
prompt="",
negative_prompt="",
guidance_scale=7.5,
resize_check=True,
enable_safety=True,
fill_mode="patchmatch",
strength=0.75,
step=50,
enable_img2img=False,
use_seed=False,
seed_val=-1,
generate_num=1,
scheduler="",
scheduler_eta=0.0,
**kwargs,
):
text2img, inpaint, img2img, unified = (
self.text2img,
self.inpaint,
self.img2img,
self.unified,
)
selected_scheduler = scheduler_dict.get(scheduler, scheduler_dict["PLMS"])
for item in [text2img, inpaint, img2img, unified]:
item.scheduler = selected_scheduler
if enable_safety:
item.safety_checker = self.safety_checker
else:
item.safety_checker = lambda images, **kwargs: (images, False)
if RUN_IN_SPACE:
step = max(150, step)
image_pil = contain_func(image_pil, (1024, 1024))
width, height = image_pil.size
sel_buffer = np.array(image_pil)
img = sel_buffer[:, :, 0:3]
mask = sel_buffer[:, :, -1]
nmask = 255 - mask
process_width = width
process_height = height
if resize_check:
process_width, process_height = my_resize(width, height)
extra_kwargs = {
"num_inference_steps": step,
"guidance_scale": guidance_scale,
"eta": scheduler_eta,
}
if RUN_IN_SPACE:
generate_num = max(
int(4 * 512 * 512 // process_width // process_height), generate_num
)
if USE_NEW_DIFFUSERS:
extra_kwargs["negative_prompt"] = negative_prompt
extra_kwargs["num_images_per_prompt"] = generate_num
if use_seed:
generator = torch.Generator(text2img.device).manual_seed(seed_val)
extra_kwargs["generator"] = generator
if nmask.sum() < 1 and enable_img2img:
init_image = Image.fromarray(img)
with autocast("cuda"):
images = img2img(
prompt=prompt,
init_image=init_image.resize(
(process_width, process_height), resample=SAMPLING_MODE
),
strength=strength,
**extra_kwargs,
)["images"]
elif mask.sum() > 0:
if fill_mode == "g_diffuser" and not self.inpainting_model:
mask = 255 - mask
mask = mask[:, :, np.newaxis].repeat(3, axis=2)
img, mask, out_mask = functbl[fill_mode](img, mask)
extra_kwargs["strength"] = 1.0
extra_kwargs["out_mask"] = Image.fromarray(out_mask)
inpaint_func = unified
else:
img, mask = functbl[fill_mode](img, mask)
mask = 255 - mask
mask = skimage.measure.block_reduce(mask, (8, 8), np.max)
mask = mask.repeat(8, axis=0).repeat(8, axis=1)
extra_kwargs["strength"] = strength
inpaint_func = inpaint
init_image = Image.fromarray(img)
mask_image = Image.fromarray(mask)
# mask_image=mask_image.filter(ImageFilter.GaussianBlur(radius = 8))
with autocast("cuda"):
input_image = init_image.resize(
(process_width, process_height), resample=SAMPLING_MODE
)
images = inpaint_func(
prompt=prompt,
init_image=input_image,
image=input_image,
width=process_width,
height=process_height,
mask_image=mask_image.resize((process_width, process_height)),
**extra_kwargs,
)["images"]
else:
with autocast("cuda"):
images = text2img(
prompt=prompt,
height=process_width,
width=process_height,
**extra_kwargs,
)["images"]
return images
def get_model(token="", model_choice="", model_path=""):
if "model" not in model:
model_name = ""
if model_choice == ModelChoice.INPAINTING.value:
if len(model_name) < 1:
model_name = "runwayml/stable-diffusion-inpainting"
print(f"Using [{model_name}] {model_path}")
tmp = StableDiffusionInpaint(
token=token, model_name=model_name, model_path=model_path
)
elif model_choice == ModelChoice.INPAINTING_IMG2IMG.value:
print(
f"Note that {ModelChoice.INPAINTING_IMG2IMG.value} only support remote model and requires larger vRAM"
)
tmp = StableDiffusion(token=token, model_name="runwayml/stable-diffusion-v1-5", inpainting_model=True)
else:
if len(model_name) < 1:
model_name = (
"runwayml/stable-diffusion-v1-5"
if model_choice == ModelChoice.MODEL_1_5.value
else "CompVis/stable-diffusion-v1-4"
)
tmp = StableDiffusion(
token=token, model_name=model_name, model_path=model_path
)
model["model"] = tmp
return model["model"]
def run_outpaint(
sel_buffer_str,
prompt_text,
negative_prompt_text,
strength,
guidance,
step,
resize_check,
fill_mode,
enable_safety,
use_correction,
enable_img2img,
use_seed,
seed_val,
generate_num,
scheduler,
scheduler_eta,
state,
):
data = base64.b64decode(str(sel_buffer_str))
pil = Image.open(io.BytesIO(data))
width, height = pil.size
sel_buffer = np.array(pil)
cur_model = get_model()
images = cur_model.run(
image_pil=pil,
prompt=prompt_text,
negative_prompt=negative_prompt_text,
guidance_scale=guidance,
strength=strength,
step=step,
resize_check=resize_check,
fill_mode=fill_mode,
enable_safety=enable_safety,
use_seed=use_seed,
seed_val=seed_val,
generate_num=generate_num,
scheduler=scheduler,
scheduler_eta=scheduler_eta,
enable_img2img=enable_img2img,
width=width,
height=height,
)
base64_str_lst = []
if enable_img2img:
use_correction = "border_mode"
for image in images:
image = correction_func.run(pil.resize(image.size), image, mode=use_correction)
resized_img = image.resize((width, height), resample=SAMPLING_MODE,)
out = sel_buffer.copy()
out[:, :, 0:3] = np.array(resized_img)
out[:, :, -1] = 255
out_pil = Image.fromarray(out)
out_buffer = io.BytesIO()
out_pil.save(out_buffer, format="PNG")
out_buffer.seek(0)
base64_bytes = base64.b64encode(out_buffer.read())
base64_str = base64_bytes.decode("ascii")
base64_str_lst.append(base64_str)
return (
gr.update(label=str(state + 1), value=",".join(base64_str_lst),),
gr.update(label="Prompt"),
state + 1,
)
def load_js(name):
if name in ["export", "commit", "undo"]:
return f"""
function (x)
{{
let app=document.querySelector("gradio-app");
app=app.shadowRoot??app;
let frame=app.querySelector("#sdinfframe").contentWindow.document;
let button=frame.querySelector("#{name}");
button.click();
return x;
}}
"""
ret = ""
with open(f"./js/{name}.js", "r") as f:
ret = f.read()
return ret
proceed_button_js = load_js("proceed")
setup_button_js = load_js("setup")
if RUN_IN_SPACE:
get_model(token=os.environ.get("hftoken", ""), model_choice=ModelChoice.INPAINTING_IMG2IMG.value)
blocks = gr.Blocks(
title="StableDiffusion-Infinity",
css="""
.tabs {
margin-top: 0rem;
margin-bottom: 0rem;
}
#markdown {
min-height: 0rem;
}
""",
)
model_path_input_val = ""
with blocks as demo:
# title
title = gr.Markdown(
"""
**stablediffusion-infinity**: Outpainting with Stable Diffusion on an infinite canvas: [https://github.com/lkwq007/stablediffusion-infinity](https://github.com/lkwq007/stablediffusion-infinity)
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lkwq007/stablediffusion-infinity/blob/master/stablediffusion_infinity_colab.ipynb)
[![Setup Locally](https://img.shields.io/badge/%F0%9F%96%A5%EF%B8%8F%20Setup-Locally-blue)](https://github.com/lkwq007/stablediffusion-infinity/blob/master/docs/setup_guide.md)
""",
elem_id="markdown",
)
# frame
frame = gr.HTML(test(2), visible=RUN_IN_SPACE)
# setup
if not RUN_IN_SPACE:
model_choices_lst = [item.value for item in ModelChoice]
if args.local_model:
model_path_input_val = args.local_model
# model_choices_lst.insert(0, "local_model")
elif args.remote_model:
model_path_input_val = args.remote_model
# model_choices_lst.insert(0, "remote_model")
with gr.Row(elem_id="setup_row"):
with gr.Column(scale=4, min_width=350):
token = gr.Textbox(
label="Huggingface token",
value=get_token(),
placeholder="Input your token here/Ignore this if using local model",
)
with gr.Column(scale=3, min_width=320):
model_selection = gr.Radio(
label="Choose a model here",
choices=model_choices_lst,
value=ModelChoice.INPAINTING.value,
)
with gr.Column(scale=1, min_width=100):
canvas_width = gr.Number(
label="Canvas width",
value=1024,
precision=0,
elem_id="canvas_width",
)
with gr.Column(scale=1, min_width=100):
canvas_height = gr.Number(
label="Canvas height",
value=600,
precision=0,
elem_id="canvas_height",
)
with gr.Column(scale=1, min_width=100):
selection_size = gr.Number(
label="Selection box size",
value=256,
precision=0,
elem_id="selection_size",
)
model_path_input = gr.Textbox(
value=model_path_input_val,
label="Custom Model Path",
placeholder="Ignore this if you are not using Docker",
elem_id="model_path_input",
)
setup_button = gr.Button("Click to Setup (may take a while)", variant="primary")
with gr.Row():
with gr.Column(scale=3, min_width=270):
init_mode = gr.Radio(
label="Init Mode",
choices=[
"patchmatch",
"edge_pad",
"cv2_ns",
"cv2_telea",
"perlin",
"gaussian",
],
value="patchmatch",
type="value",
)
postprocess_check = gr.Radio(
label="Photometric Correction Mode",
choices=["disabled", "mask_mode", "border_mode",],
value="disabled",
type="value",
)
# canvas control
with gr.Column(scale=3, min_width=270):
sd_prompt = gr.Textbox(
label="Prompt", placeholder="input your prompt here!", lines=2
)
sd_negative_prompt = gr.Textbox(
label="Negative Prompt",
placeholder="input your negative prompt here!",
lines=2,
)
with gr.Column(scale=2, min_width=150):
with gr.Group():
with gr.Row():
sd_generate_num = gr.Number(
label="Sample number", value=1, precision=0
)
sd_strength = gr.Slider(
label="Strength",
minimum=0.0,
maximum=1.0,
value=0.75,
step=0.01,
)
with gr.Row():
sd_scheduler = gr.Dropdown(
list(scheduler_dict.keys()), label="Scheduler", value="PLMS"
)
sd_scheduler_eta = gr.Number(label="Eta", value=0.0)
with gr.Column(scale=1, min_width=80):
sd_step = gr.Number(label="Step", value=50, precision=0)
sd_guidance = gr.Number(label="Guidance", value=7.5)
proceed_button = gr.Button("Proceed", elem_id="proceed", visible=DEBUG_MODE)
xss_js = load_js("xss").replace("\n", " ")
xss_html = gr.HTML(
value=f"""
""",
visible=False,
)
xss_keyboard_js = load_js("keyboard").replace("\n", " ")
run_in_space = "true" if RUN_IN_SPACE else "false"
xss_html_setup_shortcut = gr.HTML(
value=f"""
""",
visible=False,
)
# sd pipeline parameters
sd_img2img = gr.Checkbox(label="Enable Img2Img", value=False, visible=False)
sd_resize = gr.Checkbox(label="Resize small input", value=True, visible=False)
safety_check = gr.Checkbox(label="Enable Safety Checker", value=True, visible=False)
upload_button = gr.Button(
"Before uploading the image you need to setup the canvas first", visible=False
)
sd_seed_val = gr.Number(label="Seed", value=0, precision=0, visible=False)
sd_use_seed = gr.Checkbox(label="Use seed", value=False, visible=False)
model_output = gr.Textbox(visible=DEBUG_MODE, elem_id="output", label="0")
model_input = gr.Textbox(visible=DEBUG_MODE, elem_id="input", label="Input")
upload_output = gr.Textbox(visible=DEBUG_MODE, elem_id="upload", label="0")
model_output_state = gr.State(value=0)
upload_output_state = gr.State(value=0)
cancel_button = gr.Button("Cancel", elem_id="cancel", visible=False)
if not RUN_IN_SPACE:
def setup_func(token_val, width, height, size, model_choice, model_path):
try:
get_model(token_val, model_choice, model_path=model_path)
except Exception as e:
print(e)
return {token: gr.update(value=str(e))}
return {
token: gr.update(visible=False),
canvas_width: gr.update(visible=False),
canvas_height: gr.update(visible=False),
selection_size: gr.update(visible=False),
setup_button: gr.update(visible=False),
frame: gr.update(visible=True),
upload_button: gr.update(value="Upload Image"),
model_selection: gr.update(visible=False),
model_path_input: gr.update(visible=False),
}
setup_button.click(
fn=setup_func,
inputs=[
token,
canvas_width,
canvas_height,
selection_size,
model_selection,
model_path_input,
],
outputs=[
token,
canvas_width,
canvas_height,
selection_size,
setup_button,
frame,
upload_button,
model_selection,
model_path_input,
],
_js=setup_button_js,
)
proceed_event = proceed_button.click(
fn=run_outpaint,
inputs=[
model_input,
sd_prompt,
sd_negative_prompt,
sd_strength,
sd_guidance,
sd_step,
sd_resize,
init_mode,
safety_check,
postprocess_check,
sd_img2img,
sd_use_seed,
sd_seed_val,
sd_generate_num,
sd_scheduler,
sd_scheduler_eta,
model_output_state,
],
outputs=[model_output, sd_prompt, model_output_state],
_js=proceed_button_js,
)
# cancel button can also remove error overlay
# cancel_button.click(fn=None, inputs=None, outputs=None, cancels=[proceed_event])
launch_extra_kwargs = {
"show_error": True,
# "favicon_path": ""
}
launch_kwargs = vars(args)
launch_kwargs = {k: v for k, v in launch_kwargs.items() if v is not None}
launch_kwargs.pop("remote_model", None)
launch_kwargs.pop("local_model", None)
launch_kwargs.pop("fp32", None)
launch_kwargs.update(launch_extra_kwargs)
try:
import google.colab
launch_kwargs["debug"] = True
except:
pass
if RUN_IN_SPACE:
demo.launch()
elif args.debug:
launch_kwargs["server_name"] = "0.0.0.0"
demo.queue().launch(**launch_kwargs)
else:
demo.queue().launch(**launch_kwargs)