Bloom 560M Finetuned on Instructions
Credit
Code 99.99% copied from
https://github.com/bofenghuang/vigogne
Inference Code
from peft import PeftModel
from transformers import PreTrainedTokenizer, PreTrainedModel, AutoTokenizer, AutoModelForCausalLM
from peft import PeftModelForCausalLM, LoraConfig
from typing import Optional
from transformers import GenerationConfig
import torch
PROMPT_DICT = {
"prompt_input": (
"Below is a^n instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Response:\n"
),
"prompt_no_input": (
"Below is an instruction that describes a task. Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Response:\n"
),
}
def get_model(model_name_or_path: str, load_in_8bit: bool = True, device_map="auto",
cpu: bool = False) -> PreTrainedModel:
if cpu:
model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map=device_map,
low_cpu_mem_usage=True)
else:
model = AutoModelForCausalLM.from_pretrained(model_name_or_path, load_in_8bit=load_in_8bit,
device_map=device_map, torch_dtype=torch.float16)
return model
def get_peft_model(model: PreTrainedModel, lora_model_name_or_path: Optional[str] = None) -> PeftModelForCausalLM:
model = PeftModel.from_pretrained(model, lora_model_name_or_path, torch_dtype=torch.float16)
return model
def get_tokenizer(model_name_or_path: str, max_input_len: int) -> PreTrainedTokenizer:
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, model_max_length=max_input_len,
padding_side="right", use_fast=False)
return tokenizer
def get_llm_inference_model(base_model_name_or_path: str, lora_model_name_or_path: str, load_in_8bit: bool,
device_map) -> PeftModel:
cpu = True if not torch.cuda.is_available() else False
model = get_model(base_model_name_or_path, load_in_8bit, device_map, cpu=cpu)
model = get_peft_model(model, lora_model_name_or_path=lora_model_name_or_path)
if not load_in_8bit:
model.half()
model.eval()
if torch.__version__ >= "2":
model = torch.compile(model)
return model
def generate_prompt(example):
return (
PROMPT_DICT["prompt_input"].format_map(example)
if example["input"]
else PROMPT_DICT["prompt_no_input"].format_map(example)
)
def infer(instruction: str, input_text: Optional[str] = None, temperature: float = 0.1, top_p: float = 0.95,
max_new_tokens: int = 512, early_stopping: bool = True, do_sample: bool = True,
repetition_penalty: float = 2.5) -> str:
prompt = generate_prompt({"instruction": instruction, "input": input_text})
tokenized_inputs = tokenizer(prompt, return_tensors="pt")
device = "cuda" if torch.cuda.is_available() else "cpu"
input_ids = tokenized_inputs["input_ids"].to(device)
generation_config = GenerationConfig(temperature=temperature, top_p=top_p, do_sample=do_sample,
repetition_penalty=repetition_penalty, early_stopping=early_stopping)
with torch.inference_mode():
generation_output = model.generate(input_ids=input_ids, generation_config=generation_config,
return_dict_in_generate=True, max_new_tokens=max_new_tokens)
output = generation_output.sequences[0]
output = tokenizer.decode(output, skip_special_tokens=True)
return output.split("### Response:")[1].strip()
base_model_name_or_path = "bigscience/bloom-560m"
lora_model_name_or_path = "crayon-coe/alpaca-bloom-560m-en"
model = get_llm_inference_model(base_model_name_or_path, lora_model_name_or_path, True, "auto")
tokenizer = get_tokenizer(base_model_name_or_path, 512)
context = "Write a letter expressing your love for computers"
output = infer(context)
print(output)
# Output
# I am so grateful to have been able access this wonderful computer system and its amazing features, which I can now use daily with ease.
#
# My heartfelt thanks go out in advance of all my friends who are using it as well.
# Thank you again!
Training Parameters
{
"max_input_len": 512,
"load_in_8bit": True,
"model_name_or_path": "bigscience/bloom-560m",
"device_map": "auto",
"bias": "none",
"lora_dropout": 0.05,
"lora_alpha": 32,
"target_modules": ["query_key_value"],
"task_type": "CAUSAL_LM",
"lora_r": 16,
"pad_to_multiple_of": 8,
"num_train_epochs": 3,
"learning_rate": 0.0003,
"gradient_accumulation_steps": 16,
"per_device_train_batch_size": 8,
"val_set_size": 500,
"save_steps": 200,
"eval_steps": 200,
"evaluation_strategy": "steps",
"save_strategy": "steps"
}
Note: If failing, you might need to add offload_folder="some folder name" when getting the PeftModel.
Training Code
# coding=utf-8
# Code 99.99% copied and adapted from:
# https://github.com/bofenghuang/vigogne
# https://colab.research.google.com/drive/1jCkpikz0J2o20FBQmYmAGdiKmJGOMo-o?usp=sharing#scrollTo=DpYr24pR8T_0
import os
import sys
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence
import bitsandbytes as bnb
import fire
import torch
import transformers
from datasets import load_dataset
from peft import LoraConfig, TaskType, get_peft_model, get_peft_model_state_dict, prepare_model_for_int8_training
from transformers import AutoModelForCausalLM, AutoTokenizer, LlamaTokenizer
IGNORE_INDEX = -100
DEFAULT_PAD_TOKEN = "[PAD]"
PROMPT_DICT = {
"prompt_input": (
"Below is a^n instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Response:\n"
),
"prompt_no_input": (
"Below is an instruction that describes a task. Write a response that appropriately completes the request.\n\n"
"### Instruction:\n{instruction}\n\n### Response:\n"
),
}
def generate_prompt(example):
return (
PROMPT_DICT["prompt_input"].format_map(example)
if example["input"]
else PROMPT_DICT["prompt_no_input"].format_map(example)
)
# Modified from: https://github.com/bofenghuang/stanford_alpaca/blob/eb5b171d9b103a12a8e14e0edca9cbc45fe1d512/train.py#L166-L182
# Almost same to transformers.DataCollatorForSeq2Seq
@dataclass
class DataCollatorForSupervisedDataset(object):
"""Collate examples for supervised fine-tuning."""
tokenizer: transformers.PreTrainedTokenizer
pad_to_multiple_of: Optional[int] = None
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
# dtype = torch.long
# input_ids, labels = tuple([torch.LongTensor(instance[key]) for instance in instances] for key in ("input_ids", "labels"))
input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels"))
if self.pad_to_multiple_of is not None:
max_length_index, max_length = max(enumerate([len(input_ids_) for input_ids_ in input_ids]),
key=lambda x: x[1])
# int(math.ceil
n_padding = ((max_length // self.pad_to_multiple_of) + 1) * self.pad_to_multiple_of - max_length
# Pad the longest example to pad_to_multiple_of * N
input_ids[max_length_index].extend([self.tokenizer.pad_token_id] * n_padding)
labels[max_length_index].extend([IGNORE_INDEX] * n_padding)
input_ids = [torch.LongTensor(input_ids_) for input_ids_ in input_ids]
labels = [torch.LongTensor(labels_) for labels_ in labels]
input_ids = torch.nn.utils.rnn.pad_sequence(input_ids, batch_first=True,
padding_value=self.tokenizer.pad_token_id)
labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=IGNORE_INDEX)
return dict(input_ids=input_ids, labels=labels, attention_mask=input_ids.ne(self.tokenizer.pad_token_id))
def train(model_name_or_path: str, output_dir: str, data_path: str, val_set_size: int = 500,
model_max_length: int = 512, lora_r: int = 16, lora_alpha: int = 32, lora_dropout: float = 0.05,
target_modules: List[str] = ["query_key_value"], num_train_epochs: int = 3, learning_rate: float = 0.0001,
per_device_train_batch_size: int = 8, gradient_accumulation_steps: int = 16, **kwargs):
device_map = "auto"
model = AutoModelForCausalLM.from_pretrained(model_name_or_path, load_in_8bit=True, device_map=device_map)
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, model_max_length=model_max_length,
padding_side="right", use_fast=False)
model = prepare_model_for_int8_training(model)
lora_config = LoraConfig(r=lora_r, lora_alpha=lora_alpha, target_modules=target_modules, lora_dropout=lora_dropout,
bias="none", task_type=TaskType.CAUSAL_LM)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
# Load data
data = load_dataset("json", data_files=data_path)
def preprocess_function(example):
# Format prompt
user_prompt = generate_prompt(example)
# Get prompt length for masking
len_user_prompt_tokens = len(tokenizer(user_prompt, truncation=True)["input_ids"])
input_ids = tokenizer(user_prompt + example["output"] + tokenizer.eos_token, truncation=True)["input_ids"]
labels = [IGNORE_INDEX] * len_user_prompt_tokens + input_ids[len_user_prompt_tokens:]
return {"input_ids": input_ids, "labels": labels}
if val_set_size > 0:
train_val = data["train"].train_test_split(test_size=val_set_size, shuffle=True, seed=42)
train_data = train_val["train"].shuffle().map(preprocess_function, remove_columns=data["train"].column_names)
val_data = train_val["test"].map(preprocess_function, remove_columns=data["train"].column_names)
else:
train_data = data["train"].shuffle().map(preprocess_function, remove_columns=data["train"].column_names)
val_data = None
trainer = transformers.Trainer(
model=model,
train_dataset=train_data,
eval_dataset=val_data,
args=transformers.TrainingArguments(
per_device_train_batch_size=per_device_train_batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
num_train_epochs=num_train_epochs,
learning_rate=learning_rate,
fp16=True,
output_dir=output_dir,
load_best_model_at_end=True if val_set_size > 0 else False,
**kwargs,
),
data_collator=DataCollatorForSupervisedDataset(tokenizer=tokenizer, pad_to_multiple_of=8),
)
print(trainer.args)
# Silence the warnings. Please re-enable for inference!
model.config.use_cache = False
old_state_dict = model.state_dict
model.state_dict = (lambda self, *_, **__: get_peft_model_state_dict(self, old_state_dict())).__get__(model,
type(model))
if torch.__version__ >= "2" and sys.platform != "win32":
model = torch.compile(model)
trainer.train()
model.save_pretrained(output_dir)
if __name__ == "__main__":
fire.Fire(train)