Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
is_spaces = True if os.environ.get("SPACE_ID") else False | |
if is_spaces: | |
import spaces | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
import sys | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Add the current working directory to the Python path | |
sys.path.insert(0, os.getcwd()) | |
import gradio as gr | |
from PIL import Image | |
import torch | |
import uuid | |
import os | |
import shutil | |
import json | |
import yaml | |
from slugify import slugify | |
from transformers import AutoProcessor, AutoModelForCausalLM | |
import subprocess | |
if not is_spaces: | |
from toolkit.job import get_job | |
MAX_IMAGES = 150 | |
def load_captioning(uploaded_images, concept_sentence): | |
updates = [] | |
if len(uploaded_images) <= 1: | |
raise gr.Error( | |
"Please upload at least 2 images to train your model (the ideal number with default settings is between 4-30)" | |
) | |
elif len(uploaded_images) > MAX_IMAGES: | |
raise gr.Error(f"For now, only {MAX_IMAGES} or less images are allowed for training") | |
# Update for the captioning_area | |
# for _ in range(3): | |
updates.append(gr.update(visible=True)) | |
# Update visibility and image for each captioning row and image | |
for i in range(1, MAX_IMAGES + 1): | |
# Determine if the current row and image should be visible | |
visible = i <= len(uploaded_images) | |
# Update visibility of the captioning row | |
updates.append(gr.update(visible=visible)) | |
# Update for image component - display image if available, otherwise hide | |
image_value = uploaded_images[i - 1] if visible else None | |
updates.append(gr.update(value=image_value, visible=visible)) | |
# Update value of captioning area | |
text_value = "[trigger]" if visible and concept_sentence else None | |
updates.append(gr.update(value=text_value, visible=visible)) | |
# Update for the sample caption area | |
updates.append(gr.update(visible=True)) | |
updates.append(gr.update(placeholder=f'A photo of {concept_sentence} holding a sign that reads "Hello friend"')) | |
updates.append(gr.update(placeholder=f"A mountainous landscape in the style of {concept_sentence}")) | |
updates.append(gr.update(placeholder=f"A {concept_sentence} in a mall")) | |
return updates | |
if is_spaces: | |
load_captioning = spaces.GPU()(load_captioning) | |
def create_dataset(*inputs): | |
print("Creating dataset") | |
images = inputs[0] | |
destination_folder = str(f"datasets/{uuid.uuid4()}") | |
if not os.path.exists(destination_folder): | |
os.makedirs(destination_folder) | |
jsonl_file_path = os.path.join(destination_folder, "metadata.jsonl") | |
with open(jsonl_file_path, "a") as jsonl_file: | |
for index, image in enumerate(images): | |
new_image_path = shutil.copy(image, destination_folder) | |
original_caption = inputs[index + 1] | |
file_name = os.path.basename(new_image_path) | |
data = {"file_name": file_name, "prompt": original_caption} | |
jsonl_file.write(json.dumps(data) + "\n") | |
return destination_folder | |
def run_captioning(images, concept_sentence, *captions): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 | |
model = AutoModelForCausalLM.from_pretrained( | |
"microsoft/Florence-2-large", torch_dtype=torch_dtype, trust_remote_code=True | |
).to(device) | |
processor = AutoProcessor.from_pretrained("microsoft/Florence-2-large", trust_remote_code=True) | |
captions = list(captions) | |
for i, image_path in enumerate(images): | |
print(captions[i]) | |
if isinstance(image_path, str): # If image is a file path | |
image = Image.open(image_path).convert("RGB") | |
prompt = "<DETAILED_CAPTION>" | |
inputs = processor(text=prompt, images=image, return_tensors="pt").to(device, torch_dtype) | |
generated_ids = model.generate( | |
input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=1024, num_beams=3 | |
) | |
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] | |
parsed_answer = processor.post_process_generation( | |
generated_text, task=prompt, image_size=(image.width, image.height) | |
) | |
caption_text = parsed_answer["<DETAILED_CAPTION>"].replace("The image shows ", "") | |
if concept_sentence: | |
caption_text = f"{caption_text} [trigger]" | |
captions[i] = caption_text | |
yield captions | |
model.to("cpu") | |
del model | |
del processor | |
def start_training( | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None, | |
lora_name, | |
concept_sentence, | |
steps, | |
lr, | |
rank, | |
dataset_folder, | |
sample_1, | |
sample_2, | |
sample_3, | |
): | |
if not lora_name: | |
raise gr.Error("You forgot to insert your LoRA name! This name has to be unique.") | |
print("Started training") | |
slugged_lora_name = slugify(lora_name) | |
# Load the default config | |
with open("train_lora_flux_24gb.yaml", "r") as f: | |
config = yaml.safe_load(f) | |
# Update the config with user inputs | |
config["config"]["name"] = slugged_lora_name | |
config["config"]["process"][0]["model"]["low_vram"] = True | |
config["config"]["process"][0]["train"]["skip_first_sample"] = True | |
config["config"]["process"][0]["train"]["steps"] = int(steps) | |
config["config"]["process"][0]["train"]["lr"] = float(lr) | |
config["config"]["process"][0]["network"]["linear"] = int(rank) | |
config["config"]["process"][0]["network"]["linear_alpha"] = int(rank) | |
config["config"]["process"][0]["datasets"][0]["folder_path"] = dataset_folder | |
if concept_sentence: | |
config["config"]["process"][0]["trigger_word"] = concept_sentence | |
if sample_1 or sample_2 or sample_2: | |
config["config"]["process"][0]["train"]["disable_sampling"] = False | |
config["config"]["process"][0]["sample"]["sample_every"] = steps | |
config["config"]["process"][0]["sample"]["prompts"] = [] | |
if sample_1: | |
config["config"]["process"][0]["sample"]["prompts"].append(sample_1) | |
if sample_2: | |
config["config"]["process"][0]["sample"]["prompts"].append(sample_2) | |
if sample_3: | |
config["config"]["process"][0]["sample"]["prompts"].append(sample_3) | |
else: | |
config["config"]["process"][0]["train"]["disable_sampling"] = True | |
# Save the updated config | |
config_path = f"config/{slugged_lora_name}.yaml" | |
with open(config_path, "w") as f: | |
yaml.dump(config, f) | |
if is_spaces: | |
print("Started training with spacerunner...") | |
# copy config to dataset_folder | |
shutil.copy(config_path, dataset_folder) | |
# get location of this script | |
script_location = os.path.dirname(os.path.abspath(__file__)) | |
# copy script.py from current directory to dataset_folder | |
shutil.copy(script_location + "/script.py", dataset_folder) | |
# copy requirements.autotrain to dataset_folder as requirements.txt | |
shutil.copy(script_location + "/requirements.autotrain", dataset_folder + "/requirements.txt") | |
# command to run autotrain spacerunner | |
cmd = f"autotrain spacerunner --project-name {slugged_lora_name} --script-path {dataset_folder}" | |
cmd += f" --username {profile.name} --token {oauth_token} --backend spaces-l4x1" | |
outcome = subprocess.run(cmd) | |
if outcome.returncode == 0: | |
return f"""# Your training has started. | |
## - Training Status: <a href='https://huggingface.co/spaces/{profile.name}/autotrain-{slugged_lora_name}?logs=container'>{profile.name}/autotrain-{slugged_lora_name}</a> <small>(in the logs tab)</small> | |
## - Model page: <a href='https://huggingface.co/{profile.name}/{slugged_lora_name}'>{profile.name}/{slugged_lora_name}</a> <small>(will be available when training finishes)</small>""" | |
else: | |
print("Error: ", outcome.stderr) | |
raise gr.Error("Something went wrong. Make sure the name of your LoRA is unique and try again") | |
else: | |
# run the job locally | |
job = get_job(config_path) | |
job.run() | |
job.cleanup() | |
return f"Training completed successfully. Model saved as {slugged_lora_name}" | |
theme = gr.themes.Monochrome( | |
text_size=gr.themes.Size(lg="18px", md="15px", sm="13px", xl="22px", xs="12px", xxl="24px", xxs="9px"), | |
font=[gr.themes.GoogleFont("Source Sans Pro"), "ui-sans-serif", "system-ui", "sans-serif"], | |
) | |
css = """ | |
#component-1{text-align:center} | |
.main_ui_logged_out{opacity: 0.3; pointer-events: none} | |
.tabitem{border: 0px} | |
""" | |
def swap_visibilty(profile: gr.OAuthProfile | None): | |
print(profile) | |
if is_spaces: | |
if profile is None: | |
return gr.update(elem_classes=["main_ui_logged_out"]) | |
else: | |
print(profile.name) | |
return gr.update(elem_classes=["main_ui_logged_in"]) | |
else: | |
return gr.update(elem_classes=["main_ui_logged_in"]) | |
with gr.Blocks(theme=theme, css=css) as demo: | |
gr.Markdown( | |
"""# LoRA Ease for FLUX 🧞♂️ | |
### Train a high quality FLUX LoRA in a breeze ༄ using [Ostris' AI Toolkit](https://github.com/ostris/ai-toolkit) and [AutoTrain Advanced](https://github.com/huggingface/autotrain-advanced)""" | |
) | |
if is_spaces: | |
gr.LoginButton("Sign in with Hugging Face to train your LoRA on Spaces", visible=is_spaces) | |
with gr.Tab("Train on Spaces" if is_spaces else "Train locally"): | |
with gr.Column() as main_ui: | |
with gr.Row(): | |
lora_name = gr.Textbox( | |
label="The name of your LoRA", | |
info="This has to be a unique name", | |
placeholder="e.g.: Persian Miniature Painting style, Cat Toy", | |
) | |
# training_option = gr.Radio( | |
# label="What are you training?", choices=["object", "style", "character", "face", "custom"] | |
# ) | |
concept_sentence = gr.Textbox( | |
label="Trigger word/sentence", | |
info="Trigger word or sentence to be used", | |
placeholder="uncommon word like p3rs0n or trtcrd, or sentence like 'in the style of CNSTLL'", | |
interactive=True, | |
) | |
with gr.Group(visible=True) as image_upload: | |
with gr.Row(): | |
images = gr.File( | |
file_types=["image"], | |
label="Upload your images", | |
file_count="multiple", | |
interactive=True, | |
visible=True, | |
scale=1, | |
) | |
with gr.Column(scale=3, visible=False) as captioning_area: | |
with gr.Column(): | |
gr.Markdown( | |
"""# Custom captioning | |
You can optionally add a custom caption for each image (or use an AI model for this). [trigger] will represent your concept sentence/trigger word. | |
""" | |
) | |
do_captioning = gr.Button("Add AI captions with Florence-2") | |
output_components = [captioning_area] | |
caption_list = [] | |
for i in range(1, MAX_IMAGES + 1): | |
locals()[f"captioning_row_{i}"] = gr.Row(visible=False) | |
with locals()[f"captioning_row_{i}"]: | |
locals()[f"image_{i}"] = gr.Image( | |
type="filepath", | |
width=111, | |
height=111, | |
min_width=111, | |
interactive=False, | |
scale=2, | |
show_label=False, | |
show_share_button=False, | |
show_download_button=False, | |
) | |
locals()[f"caption_{i}"] = gr.Textbox( | |
label=f"Caption {i}", scale=15, interactive=True | |
) | |
output_components.append(locals()[f"captioning_row_{i}"]) | |
output_components.append(locals()[f"image_{i}"]) | |
output_components.append(locals()[f"caption_{i}"]) | |
caption_list.append(locals()[f"caption_{i}"]) | |
with gr.Accordion("Advanced options", open=False): | |
steps = gr.Number(label="Steps", value=1000, minimum=1, maximum=10000, step=1) | |
lr = gr.Number(label="Learning Rate", value=4e-4, minimum=1e-6, maximum=1e-3, step=1e-6) | |
rank = gr.Number(label="LoRA Rank", value=16, minimum=4, maximum=128, step=4) | |
with gr.Accordion("Sample prompts", visible=False) as sample: | |
gr.Markdown( | |
"Include sample prompts to test out your trained model. Don't forget to include your trigger word/sentence (optional)" | |
) | |
sample_1 = gr.Textbox(label="Test prompt 1") | |
sample_2 = gr.Textbox(label="Test prompt 2") | |
sample_3 = gr.Textbox(label="Test prompt 3") | |
output_components.append(sample) | |
output_components.append(sample_1) | |
output_components.append(sample_2) | |
output_components.append(sample_3) | |
start = gr.Button("Start training") | |
progress_area = gr.Markdown("") | |
with gr.Tab("Train locally" if is_spaces else "Instructions"): | |
gr.Markdown( | |
f"""To use FLUX LoRA Ease locally with this UI, you can clone this repository (yes, HF Spaces are git repos!) | |
```bash | |
git clone https://huggingface.co/spaces/flux-train/flux-lora-trainer | |
cd flux-lora-trainer | |
pip install requirements_local.txt | |
``` | |
Then you can install ai-toolkit | |
```bash | |
git clone https://github.com/ostris/ai-toolkit.git | |
cd ai-toolkit | |
git submodule update --init --recursive | |
python3 -m venv venv | |
source venv/bin/activate | |
# .\venv\Scripts\activate on windows | |
# install torch first | |
pip3 install torch | |
pip3 install -r requirements.txt | |
cd .. | |
``` | |
Login with Hugging Face to access FLUX.1 [dev], choose a token with `write` permissions to push your LoRAs to the HF Hub | |
```bash | |
huggingface-cli login | |
``` | |
Now you can run FLUX LoRA Ease locally by doing a simple | |
```py | |
python app.py | |
``` | |
If you prefer command line, you can run Ostris' [AI Toolkit](https://github.com/ostris/ai-toolkit) yourself directly. | |
""" | |
) | |
dataset_folder = gr.State() | |
images.upload(load_captioning, inputs=[images, concept_sentence], outputs=output_components, queue=False) | |
start.click(fn=create_dataset, inputs=[images] + caption_list, outputs=dataset_folder, queue=False).then( | |
fn=start_training, | |
inputs=[ | |
lora_name, | |
concept_sentence, | |
steps, | |
lr, | |
rank, | |
dataset_folder, | |
sample_1, | |
sample_2, | |
sample_3, | |
], | |
outputs=progress_area, | |
queue=False, | |
) | |
do_captioning.click(fn=run_captioning, inputs=[images, concept_sentence] + caption_list, outputs=caption_list) | |
demo.load(fn=swap_visibilty, outputs=main_ui, queue=False) | |
if __name__ == "__main__": | |
demo.queue() | |
demo.launch(share=True) | |