CrewAI-Studio / pg_export_crew.py
zhengr's picture
init
58e0d8e
raw
history blame
19.3 kB
import streamlit as st
from streamlit import session_state as ss
import zipfile
import os
import re
import json
import shutil
import db_utils
from utils import escape_quotes
from my_tools import TOOL_CLASSES
from crewai import Process
from my_crew import MyCrew
from my_agent import MyAgent
from my_task import MyTask
from datetime import datetime
class PageExportCrew:
def __init__(self):
self.name = "Import/export"
def extract_placeholders(self, text):
return re.findall(r'\{(.*?)\}', text)
def get_placeholders_from_crew(self, crew):
placeholders = set()
for task in crew.tasks:
placeholders.update(self.extract_placeholders(task.description))
placeholders.update(self.extract_placeholders(task.expected_output))
return list(placeholders)
def generate_streamlit_app(self, crew, output_dir):
agents = crew.agents
tasks = crew.tasks
# Check if any custom tools are used
custom_tools_used = any(tool.name in ["CustomApiTool", "CustomFileWriteTool", "CustomCodeInterpreterTool"]
for agent in agents for tool in agent.tools)
def json_dumps_python(obj):
if isinstance(obj, bool):
return str(obj)
return json.dumps(obj)
def format_tool_instance(tool):
tool_class = TOOL_CLASSES.get(tool.name)
if tool_class:
params = ', '.join([f'{key}={json_dumps_python(value)}' for key, value in tool.parameters.items() if value is not None])
return f'{tool.name}({params})' if params else f'{tool.name}()'
return None
agent_definitions = ",\n ".join([
f"""
Agent(
role={json_dumps_python(agent.role)},
backstory={json_dumps_python(agent.backstory)},
goal={json_dumps_python(agent.goal)},
allow_delegation={json_dumps_python(agent.allow_delegation)},
verbose={json_dumps_python(agent.verbose)},
tools=[{', '.join([format_tool_instance(tool) for tool in agent.tools])}],
llm=create_llm({json_dumps_python(agent.llm_provider_model)}, {json_dumps_python(agent.temperature)})
)
"""
for agent in agents
])
task_definitions = ",\n ".join([
f"""
Task(
description={json_dumps_python(task.description)},
expected_output={json_dumps_python(task.expected_output)},
agent=next(agent for agent in agents if agent.role == {json_dumps_python(task.agent.role)}),
async_execution={json_dumps_python(task.async_execution)}
)
"""
for task in tasks
])
placeholders = self.get_placeholders_from_crew(crew)
placeholder_inputs = "\n ".join([
f'{placeholder} = st.text_input({json_dumps_python(placeholder.capitalize())})'
for placeholder in placeholders
])
placeholders_dict = ", ".join([f'{json_dumps_python(placeholder)}: {placeholder}' for placeholder in placeholders])
manager_llm_definition = ""
if crew.process == Process.hierarchical and crew.manager_llm:
manager_llm_definition = f'manager_llm=create_llm({json_dumps_python(crew.manager_llm)})'
elif crew.process == Process.hierarchical and crew.manager_agent:
manager_llm_definition = f'manager_agent=next(agent for agent in agents if agent.role == {json_dumps_python(crew.manager_agent.role)})'
app_content = f"""
import streamlit as st
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from langchain_groq import ChatGroq
from langchain_anthropic import ChatAnthropic
from dotenv import load_dotenv
import os
from crewai_tools import *
{'''from custom_tools import CustomApiTool, CustomFileWriteTool, CustomCodeInterpreterTool''' if custom_tools_used else ''}
load_dotenv()
def create_lmstudio_llm(model, temperature):
api_base = os.getenv('LMSTUDIO_API_BASE')
os.environ["OPENAI_API_KEY"] = "lm-studio"
os.environ["OPENAI_API_BASE"] = api_base
if api_base:
return ChatOpenAI(openai_api_key='lm-studio', openai_api_base=api_base, temperature=temperature)
else:
raise ValueError("LM Studio API base not set in .env file")
def create_openai_llm(model, temperature):
safe_pop_env_var('OPENAI_API_KEY')
safe_pop_env_var('OPENAI_API_BASE')
load_dotenv(override=True)
api_key = os.getenv('OPENAI_API_KEY')
api_base = os.getenv('OPENAI_API_BASE', 'https://api.openai.com/v1/')
if api_key:
return ChatOpenAI(openai_api_key=api_key, openai_api_base=api_base, model_name=model, temperature=temperature)
else:
raise ValueError("OpenAI API key not set in .env file")
def create_groq_llm(model, temperature):
api_key = os.getenv('GROQ_API_KEY')
if api_key:
return ChatGroq(groq_api_key=api_key, model_name=model, temperature=temperature)
else:
raise ValueError("Groq API key not set in .env file")
def create_anthropic_llm(model, temperature):
api_key = os.getenv('ANTHROPIC_API_KEY')
if api_key:
return ChatAnthropic(anthropic_api_key=api_key, model_name=model, temperature=temperature)
else:
raise ValueError("Anthropic API key not set in .env file")
def safe_pop_env_var(key):
try:
os.environ.pop(key)
except KeyError:
pass
LLM_CONFIG = {{
"OpenAI": {{
"create_llm": create_openai_llm
}},
"Groq": {{
"create_llm": create_groq_llm
}},
"LM Studio": {{
"create_llm": create_lmstudio_llm
}},
"Anthropic": {{
"create_llm": create_anthropic_llm
}}
}}
def create_llm(provider_and_model, temperature=0.1):
provider, model = provider_and_model.split(": ")
create_llm_func = LLM_CONFIG.get(provider, {{}}).get("create_llm")
if create_llm_func:
return create_llm_func(model, temperature)
else:
raise ValueError(f"LLM provider {{provider}} is not recognized or not supported")
def load_agents():
agents = [
{agent_definitions}
]
return agents
def load_tasks(agents):
tasks = [
{task_definitions}
]
return tasks
def main():
st.title({json_dumps_python(crew.name)})
agents = load_agents()
tasks = load_tasks(agents)
crew = Crew(
agents=agents,
tasks=tasks,
process={json_dumps_python(crew.process)},
verbose={json_dumps_python(crew.verbose)},
memory={json_dumps_python(crew.memory)},
cache={json_dumps_python(crew.cache)},
max_rpm={json_dumps_python(crew.max_rpm)},
{manager_llm_definition}
)
{placeholder_inputs}
placeholders = {{
{placeholders_dict}
}}
if st.button("Run Crew"):
with st.spinner("Running crew..."):
try:
result = crew.kickoff(inputs=placeholders)
if isinstance(result, dict):
with st.expander("Final output", expanded=True):
st.write(result.get('final_output', 'No final output available'))
with st.expander("Full output", expanded=False):
st.write(result)
else:
st.write("Result:")
st.write(result)
except Exception as e:
st.error(f"An error occurred: {{str(e)}}")
if __name__ == '__main__':
main()
"""
with open(os.path.join(output_dir, 'app.py'), 'w') as f:
f.write(app_content)
# If custom tools are used, copy the custom_tools.py file
if custom_tools_used:
source_path = os.path.join(os.path.dirname(__file__), 'custom_tools.py')
dest_path = os.path.join(output_dir, 'custom_tools.py')
shutil.copy2(source_path, dest_path)
def create_env_file(self, output_dir):
env_content = """
# OPENAI_API_KEY="FILL-IN-YOUR-OPENAI-API-KEY"
# OPENAI_API_BASE="OPTIONAL-FILL-IN-YOUR-OPENAI-API-BASE"
# GROQ_API_KEY="FILL-IN-YOUR-GROQ-API-KEY"
# ANTHROPIC_API_KEY="FILL-IN-YOUR-ANTHROPIC-API-KEY"
# LMSTUDIO_API_BASE="http://localhost:1234/v1"
"""
with open(os.path.join(output_dir, '.env'), 'w') as f:
f.write(env_content)
def create_shell_scripts(self, output_dir):
install_sh_content = """
#!/bin/bash
# Create a virtual environment
python -m venv venv || { echo "Failed to create venv"; exit 1; }
# Activate the virtual environment
source venv/bin/activate || { echo "Failed to activate venv"; exit 1; }
# Install requirements
pip install -r requirements.txt || { echo "Failed to install requirements"; exit 1; }
echo "Installation completed successfully."
"""
with open(os.path.join(output_dir, 'install.sh'), 'w') as f:
f.write(install_sh_content)
os.chmod(os.path.join(output_dir, 'install.sh'), 0o755)
run_sh_content = """
#!/bin/bash
# Get the directory where the script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
# Activate the virtual environment
source "$SCRIPT_DIR/venv/bin/activate" || { echo "Failed to activate venv"; exit 1; }
cd "$SCRIPT_DIR"
streamlit run app.py --server.headless True
"""
with open(os.path.join(output_dir, 'run.sh'), 'w') as f:
f.write(run_sh_content)
os.chmod(os.path.join(output_dir, 'run.sh'), 0o755)
install_bat_content = """
@echo off
:: Create a virtual environment
python -m venv venv || (
echo Failed to create venv
exit /b 1
)
:: Activate the virtual environment
call venv\\Scripts\\activate || (
echo Failed to activate venv
exit /b 1
)
:: Install requirements
pip install -r requirements.txt || (
echo Failed to install requirements
exit /b 1
)
echo Installation completed successfully.
"""
with open(os.path.join(output_dir, 'install.bat'), 'w') as f:
f.write(install_bat_content)
run_bat_content = """
@echo off
:: Activate the virtual environment
call venv\\Scripts\\activate || (
echo Failed to activate venv
exit /b 1
)
:: Run the Streamlit app
streamlit run app.py --server.headless true
"""
with open(os.path.join(output_dir, 'run.bat'), 'w') as f:
f.write(run_bat_content)
# Copy the main project's requirements.txt
source_requirements = os.path.join(os.path.dirname(__file__), '..', 'requirements.txt')
dest_requirements = os.path.join(output_dir, 'requirements.txt')
shutil.copy2(source_requirements, dest_requirements)
def zip_directory(self, folder_path, output_path):
with zipfile.ZipFile(output_path, 'w') as zip_file:
for foldername, subfolders, filenames in os.walk(folder_path):
for filename in filenames:
file_path = os.path.join(foldername, filename)
arcname = os.path.relpath(file_path, folder_path)
zip_file.write(file_path, arcname)
def create_export(self, crew_name):
output_dir = f"{crew_name}_app"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
selected_crew = next((crew for crew in ss.crews if crew.name == crew_name), None)
if selected_crew:
self.generate_streamlit_app(selected_crew, output_dir)
self.create_env_file(output_dir)
self.create_shell_scripts(output_dir)
zip_path = f"{crew_name}_app.zip"
self.zip_directory(output_dir, zip_path)
return zip_path
def export_crew_to_json(self, crew):
crew_data = {
'id': crew.id,
'name': crew.name,
'process': crew.process,
'verbose': crew.verbose,
'memory': crew.memory,
'cache': crew.cache,
'max_rpm': crew.max_rpm,
'manager_llm': crew.manager_llm,
'manager_agent': crew.manager_agent.id if crew.manager_agent else None,
'created_at': crew.created_at,
'agents': [],
'tasks': [],
'tools': []
}
tool_ids = set()
for agent in crew.agents:
agent_data = {
'id': agent.id,
'role': agent.role,
'backstory': agent.backstory,
'goal': agent.goal,
'allow_delegation': agent.allow_delegation,
'verbose': agent.verbose,
'cache': agent.cache,
'llm_provider_model': agent.llm_provider_model,
'temperature': agent.temperature,
'max_iter': agent.max_iter,
'tool_ids': [tool.tool_id for tool in agent.tools]
}
crew_data['agents'].append(agent_data)
tool_ids.update(agent_data['tool_ids'])
for task in crew.tasks:
task_data = {
'id': task.id,
'description': task.description,
'expected_output': task.expected_output,
'async_execution': task.async_execution,
'agent_id': task.agent.id if task.agent else None,
'context_from_async_tasks_ids': task.context_from_async_tasks_ids,
'created_at': task.created_at
}
crew_data['tasks'].append(task_data)
for tool_id in tool_ids:
tool = next((t for t in ss.tools if t.tool_id == tool_id), None)
if tool:
tool_data = {
'tool_id': tool.tool_id,
'name': tool.name,
'description': tool.description,
'parameters': tool.get_parameters()
}
crew_data['tools'].append(tool_data)
return json.dumps(crew_data, indent=2)
def import_crew_from_json(self, crew_data):
# Create tools
for tool_data in crew_data['tools']:
tool_class = TOOL_CLASSES[tool_data['name']]
tool = tool_class(tool_id=tool_data['tool_id'])
tool.set_parameters(**tool_data['parameters'])
if tool not in ss.tools:
ss.tools.append(tool)
db_utils.save_tool(tool)
# Create agents
agents = []
for agent_data in crew_data['agents']:
agent = MyAgent(
id=agent_data['id'],
role=agent_data['role'],
backstory=agent_data['backstory'],
goal=agent_data['goal'],
allow_delegation=agent_data['allow_delegation'],
verbose=agent_data['verbose'],
cache=agent_data.get('cache', True),
llm_provider_model=agent_data['llm_provider_model'],
temperature=agent_data['temperature'],
max_iter=agent_data['max_iter'],
created_at=agent_data.get('created_at')
)
agent.tools = [next(tool for tool in ss.tools if tool.tool_id == tool_id) for tool_id in agent_data['tool_ids']]
agents.append(agent)
db_utils.save_agent(agent)
# Create tasks
tasks = []
for task_data in crew_data['tasks']:
task = MyTask(
id=task_data['id'],
description=task_data['description'],
expected_output=task_data['expected_output'],
async_execution=task_data['async_execution'],
agent=next((agent for agent in agents if agent.id == task_data['agent_id']), None),
context_from_async_tasks_ids=task_data['context_from_async_tasks_ids'],
created_at=task_data['created_at']
)
tasks.append(task)
db_utils.save_task(task)
# Create crew
crew = MyCrew(
id=crew_data['id'],
name=crew_data['name'],
process=crew_data['process'],
verbose=crew_data['verbose'],
memory=crew_data['memory'],
cache=crew_data['cache'],
max_rpm=crew_data['max_rpm'],
manager_llm=crew_data['manager_llm'],
manager_agent=next((agent for agent in agents if agent.id == crew_data['manager_agent']), None),
created_at=crew_data['created_at']
)
crew.agents = agents
crew.tasks = tasks
db_utils.save_crew(crew)
if crew not in ss.crews:
ss.crews.append(crew)
return crew
def draw(self):
st.subheader(self.name)
# Full JSON Export Button
if st.button("Export everything to json"):
current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = f"all_crews_{current_datetime}.json"
db_utils.export_to_json(file_path)
with open(file_path, "rb") as fp:
st.download_button(
label="Download All Crews JSON",
data=fp,
file_name=file_path,
mime="application/json"
)
# JSON Import Button
uploaded_file = st.file_uploader("Import JSON file", type="json")
if uploaded_file is not None:
json_data = json.load(uploaded_file)
if isinstance(json_data, list): # Full database export
with open("uploaded_file.json", "w") as f:
json.dump(json_data, f)
db_utils.import_from_json("uploaded_file.json")
st.success("Full database JSON file imported successfully!")
elif isinstance(json_data, dict) and 'id' in json_data: # Single crew export
imported_crew = self.import_crew_from_json(json_data)
st.success(f"Crew '{imported_crew.name}' imported successfully!")
else:
st.error("Invalid JSON format. Please upload a valid crew or full database export file.")
if 'crews' not in ss or len(ss.crews) == 0:
st.write("No crews defined yet.")
else:
crew_names = [crew.name for crew in ss.crews]
selected_crew_name = st.selectbox("Select crew to export", crew_names)
if st.button("Export singlepage app"):
zip_path = self.create_export(selected_crew_name)
with open(zip_path, "rb") as fp:
st.download_button(
label="Download Exported App",
data=fp,
file_name=f"{selected_crew_name}_app.zip",
mime="application/zip"
)
if st.button("Export crew to JSON"):
selected_crew = next((crew for crew in ss.crews if crew.name == selected_crew_name), None)
if selected_crew:
crew_json = self.export_crew_to_json(selected_crew)
st.download_button(
label="Download Crew JSON",
data=crew_json,
file_name=f"{selected_crew_name}_export.json",
mime="application/json"
)