|
from __future__ import annotations |
|
import sys |
|
import shutil |
|
import subprocess |
|
|
|
import os |
|
import time |
|
|
|
from modules import timer |
|
from modules import initialize_util |
|
from modules import initialize |
|
from threading import Thread |
|
from modules_forge.initialization import initialize_forge |
|
from modules_forge import main_thread |
|
workspace_dir = "/mnt/workspace/stable-diffusion-webui-forge" |
|
os.chdir(workspace_dir) |
|
def check_blacklist(): |
|
def download_blacklist(): |
|
def aria2(url, filename, d): |
|
subprocess.run( |
|
["aria2c", "-c", "-x", "16", "-s", "16", url, "-o", filename, "-d", d], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL |
|
) |
|
|
|
url = "https://hf-mirror.com/dong625/su/resolve/main/blacklist.txt" |
|
file_path = '/mnt/blacklist.txt' |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
aria2(url, "blacklist.txt", "/mnt") |
|
|
|
download_blacklist() |
|
|
|
try: |
|
with open("/mnt/userinfo.txt", "r") as user_file: |
|
user_content = user_file.read().strip() |
|
except FileNotFoundError: |
|
return False |
|
|
|
try: |
|
with open("/mnt/blacklist.txt", "r") as blacklist_file: |
|
blacklist_content = blacklist_file.readlines() |
|
except FileNotFoundError: |
|
return True |
|
|
|
for line in blacklist_content: |
|
if user_content == line.strip(): |
|
return False |
|
|
|
return True |
|
|
|
|
|
|
|
def delete_files_and_folders(): |
|
paths = [ |
|
"/mnt/CheckpointDownload.py", |
|
"/mnt/LoraDownload.py", |
|
"/mnt/ModelDownload.py", |
|
] |
|
|
|
for path in paths: |
|
if os.path.exists(path): |
|
if os.path.isfile(path): |
|
os.remove(path) |
|
elif os.path.isdir(path): |
|
shutil.rmtree(path) |
|
|
|
if not check_blacklist(): |
|
|
|
print("You have been blocked") |
|
|
|
|
|
def forgeflux(): |
|
import requests |
|
def get_daily_sentence(types=['a']): |
|
params = { |
|
'c': types |
|
} |
|
response = requests.get("https://v1.hitokoto.cn/", params=params) |
|
if response.status_code == 200: |
|
data = response.json() |
|
return "\033[36m\033[4m" + data["hitokoto"] + "\033[0m" |
|
else: |
|
return " \033[36m\033[4m静待花开会有时,守得云开见月明\033[0m" |
|
daily_sentence = get_daily_sentence(['i',]) |
|
def count_execution(): |
|
if not os.path.exists("/mnt/count.txt"): |
|
with open("/mnt/count.txt", "w") as f: |
|
f.write("0") |
|
with open("/mnt/count.txt", "r") as f: |
|
count = int(f.read()) |
|
count += 1 |
|
with open("/mnt/count.txt", "w") as f: |
|
f.write(str(count)) |
|
return count |
|
count=count_execution() |
|
if(count<3): |
|
print(f"这是您在本实例中第{count}次启动forge") |
|
if(count>2): |
|
print(daily_sentence) |
|
print(f"这是您在本实例中第{count}次启动forge") |
|
if os.path.exists("/mnt/workspace/Untitled.ipynb"): |
|
os.remove("/mnt/workspace/Untitled.ipynb") |
|
|
|
forgeflux() |
|
|
|
|
|
|
|
|
|
startup_timer = timer.startup_timer |
|
startup_timer.record("launcher") |
|
|
|
initialize_forge() |
|
|
|
initialize.imports() |
|
|
|
initialize.check_versions() |
|
|
|
initialize.initialize() |
|
|
|
|
|
def create_api(app): |
|
from modules.api.api import Api |
|
from modules.call_queue import queue_lock |
|
|
|
api = Api(app, queue_lock) |
|
return api |
|
|
|
|
|
def api_only_worker(): |
|
from fastapi import FastAPI |
|
from modules.shared_cmd_options import cmd_opts |
|
|
|
app = FastAPI() |
|
initialize_util.setup_middleware(app) |
|
api = create_api(app) |
|
|
|
from modules import script_callbacks |
|
script_callbacks.before_ui_callback() |
|
script_callbacks.app_started_callback(None, app) |
|
|
|
print(f"Startup time: {startup_timer.summary()}.") |
|
api.launch( |
|
server_name=initialize_util.gradio_server_name(), |
|
port=cmd_opts.port if cmd_opts.port else 7861, |
|
root_path=f"/{cmd_opts.subpath}" if cmd_opts.subpath else "" |
|
) |
|
|
|
|
|
def webui_worker(): |
|
from modules.shared_cmd_options import cmd_opts |
|
|
|
launch_api = cmd_opts.api |
|
|
|
from modules import shared, ui_tempdir, script_callbacks, ui, progress, ui_extra_networks |
|
|
|
while 1: |
|
if shared.opts.clean_temp_dir_at_start: |
|
ui_tempdir.cleanup_tmpdr() |
|
startup_timer.record("cleanup temp dir") |
|
|
|
script_callbacks.before_ui_callback() |
|
startup_timer.record("scripts before_ui_callback") |
|
|
|
shared.demo = ui.create_ui() |
|
startup_timer.record("create ui") |
|
|
|
if not cmd_opts.no_gradio_queue: |
|
shared.demo.queue(64) |
|
|
|
gradio_auth_creds = list(initialize_util.get_gradio_auth_creds()) or None |
|
|
|
auto_launch_browser = False |
|
if os.getenv('SD_WEBUI_RESTARTING') != '1': |
|
if shared.opts.auto_launch_browser == "Remote" or cmd_opts.autolaunch: |
|
auto_launch_browser = True |
|
elif shared.opts.auto_launch_browser == "Local": |
|
auto_launch_browser = not cmd_opts.webui_is_non_local |
|
|
|
from modules_forge.forge_canvas.canvas import canvas_js_root_path |
|
|
|
app, local_url, share_url = shared.demo.launch( |
|
share=cmd_opts.share, |
|
server_name=initialize_util.gradio_server_name(), |
|
server_port=cmd_opts.port, |
|
ssl_keyfile=cmd_opts.tls_keyfile, |
|
ssl_certfile=cmd_opts.tls_certfile, |
|
ssl_verify=cmd_opts.disable_tls_verify, |
|
debug=cmd_opts.gradio_debug, |
|
auth=gradio_auth_creds, |
|
inbrowser=auto_launch_browser, |
|
prevent_thread_lock=True, |
|
allowed_paths=cmd_opts.gradio_allowed_path + [canvas_js_root_path], |
|
app_kwargs={ |
|
"docs_url": "/docs", |
|
"redoc_url": "/redoc", |
|
}, |
|
root_path=f"/{cmd_opts.subpath}" if cmd_opts.subpath else "", |
|
) |
|
|
|
startup_timer.record("gradio launch") |
|
|
|
|
|
|
|
|
|
|
|
app.user_middleware = [x for x in app.user_middleware if x.cls.__name__ != 'CORSMiddleware'] |
|
|
|
initialize_util.setup_middleware(app) |
|
|
|
progress.setup_progress_api(app) |
|
ui.setup_ui_api(app) |
|
|
|
if launch_api: |
|
create_api(app) |
|
|
|
ui_extra_networks.add_pages_to_demo(app) |
|
|
|
startup_timer.record("add APIs") |
|
|
|
with startup_timer.subcategory("app_started_callback"): |
|
script_callbacks.app_started_callback(shared.demo, app) |
|
|
|
timer.startup_record = startup_timer.dump() |
|
print(f"Startup time: {startup_timer.summary()}.") |
|
|
|
try: |
|
while True: |
|
server_command = shared.state.wait_for_server_command(timeout=5) |
|
if server_command: |
|
if server_command in ("stop", "restart"): |
|
break |
|
else: |
|
print(f"Unknown server command: {server_command}") |
|
except KeyboardInterrupt: |
|
print('Caught KeyboardInterrupt, stopping...') |
|
server_command = "stop" |
|
|
|
if server_command == "stop": |
|
print("Stopping server...") |
|
|
|
shared.demo.close() |
|
break |
|
|
|
|
|
os.environ.setdefault('SD_WEBUI_RESTARTING', '1') |
|
|
|
print('Restarting UI...') |
|
shared.demo.close() |
|
time.sleep(0.5) |
|
startup_timer.reset() |
|
script_callbacks.app_reload_callback() |
|
startup_timer.record("app reload callback") |
|
script_callbacks.script_unloaded_callback() |
|
startup_timer.record("scripts unloaded callback") |
|
initialize.initialize_rest(reload_script_modules=True) |
|
|
|
|
|
def api_only(): |
|
Thread(target=api_only_worker, daemon=True).start() |
|
|
|
|
|
def webui(): |
|
Thread(target=webui_worker, daemon=True).start() |
|
|
|
|
|
if __name__ == "__main__": |
|
from modules.shared_cmd_options import cmd_opts |
|
|
|
if cmd_opts.nowebui: |
|
api_only() |
|
else: |
|
webui() |
|
|
|
main_thread.loop() |
|
|