from crewai import Crew, Process import streamlit as st from utils import rnd_id, fix_columns_width from streamlit import session_state as ss from datetime import datetime from llms import llm_providers_and_models, create_llm import db_utils class MyCrew: def __init__(self, id=None, name=None, agents=None, tasks=None, process=None, cache=None,max_rpm=None, verbose=None, manager_llm=None, manager_agent=None, created_at=None, memory=None, planning=None): self.id = id or "C_" + rnd_id() self.name = name or "Crew 1" self.agents = agents or [] self.tasks = tasks or [] self.process = process or Process.sequential self.verbose = bool(verbose) if verbose is not None else True self.manager_llm = manager_llm self.manager_agent = manager_agent self.memory = memory if memory is not None else False self.cache = cache if cache is not None else True self.max_rpm = max_rpm or 1000 self.planning = planning if planning is not None else False self.created_at = created_at or datetime.now().isoformat() self.edit_key = f'edit_{self.id}' if self.edit_key not in ss: ss[self.edit_key] = False self.tasks_order_key = f'tasks_order_{self.id}' if self.tasks_order_key not in ss: ss[self.tasks_order_key] = [task.id for task in self.tasks] @property def edit(self): return ss[self.edit_key] @edit.setter def edit(self, value): ss[self.edit_key] = value def get_crewai_crew(self, *args, **kwargs) -> Crew: crewai_agents = [agent.get_crewai_agent() for agent in self.agents] # Create a dictionary to hold the Task objects task_objects = {} def create_task(task): if task.id in task_objects: return task_objects[task.id] context_tasks = [] if task.async_execution or task.context_from_async_tasks_ids or task.context_from_sync_tasks_ids: for context_task_id in (task.context_from_async_tasks_ids or []) + (task.context_from_sync_tasks_ids or []): if context_task_id not in task_objects: context_task = next((t for t in self.tasks if t.id == context_task_id), None) if context_task: context_tasks.append(create_task(context_task)) else: print(f"Warning: Context task with id {context_task_id} not found for task {task.id}") else: context_tasks.append(task_objects[context_task_id]) # Only pass context if it's an async task or if specific context is defined if task.async_execution or context_tasks: crewai_task = task.get_crewai_task(context_from_async_tasks=context_tasks) else: crewai_task = task.get_crewai_task() task_objects[task.id] = crewai_task return crewai_task # Create all tasks, resolving dependencies recursively for task in self.tasks: create_task(task) # Collect the final list of tasks in the original order crewai_tasks = [task_objects[task.id] for task in self.tasks] if self.manager_llm: return Crew( agents=crewai_agents, tasks=crewai_tasks, cache=self.cache, process=self.process, max_rpm=self.max_rpm, verbose=self.verbose, manager_llm=create_llm(self.manager_llm), memory=self.memory, planning=self.planning, *args, **kwargs ) elif self.manager_agent: return Crew( agents=crewai_agents, tasks=crewai_tasks, cache=self.cache, process=self.process, max_rpm=self.max_rpm, verbose=self.verbose, manager_agent=self.manager_agent.get_crewai_agent(), memory=self.memory, planning=self.planning, *args, **kwargs ) cr = Crew( agents=crewai_agents, tasks=crewai_tasks, cache=self.cache, process=self.process, max_rpm=self.max_rpm, verbose=self.verbose, memory=self.memory, planning=self.planning, *args, **kwargs ) return cr def delete(self): ss.crews = [crew for crew in ss.crews if crew.id != self.id] db_utils.delete_crew(self.id) def update_name(self): self.name = ss[f'name_{self.id}'] db_utils.save_crew(self) def update_process(self): self.process = ss[f'process_{self.id}'] db_utils.save_crew(self) def update_tasks(self): selected_tasks_ids = ss[f'tasks_{self.id}'] self.tasks = [task for task in ss.tasks if task.id in selected_tasks_ids and task.agent.id in [agent.id for agent in self.agents]] self.tasks = sorted(self.tasks, key=lambda task: selected_tasks_ids.index(task.id)) ss[self.tasks_order_key] = selected_tasks_ids db_utils.save_crew(self) def update_verbose(self): self.verbose = ss[f'verbose_{self.id}'] db_utils.save_crew(self) def update_agents(self): selected_agents = ss[f'agents_{self.id}'] self.agents = [agent for agent in ss.agents if agent.role in selected_agents] db_utils.save_crew(self) def update_manager_llm(self): selected_llm = ss[f'manager_llm_{self.id}'] self.manager_llm = selected_llm if selected_llm != "None" else None if self.manager_llm: self.manager_agent = None db_utils.save_crew(self) def update_manager_agent(self): selected_agent_role = ss[f'manager_agent_{self.id}'] self.manager_agent = next((agent for agent in ss.agents if agent.role == selected_agent_role), None) if selected_agent_role != "None" else None if self.manager_agent: self.manager_llm = None db_utils.save_crew(self) def update_memory(self): self.memory = ss[f'memory_{self.id}'] db_utils.save_crew(self) def update_max_rpm(self): self.max_rpm = ss[f'max_rpm_{self.id}'] db_utils.save_crew(self) def update_cache(self): self.cache = ss[f'cache_{self.id}'] db_utils.save_crew(self) def update_planning(self): self.planning = ss[f'planning_{self.id}'] db_utils.save_crew(self) def is_valid(self, show_warning=False): if len(self.agents) == 0: if show_warning: st.warning(f"Crew {self.name} has no agents") return False if len(self.tasks) == 0: if show_warning: st.warning(f"Crew {self.name} has no tasks") return False if any([not agent.is_valid(show_warning=show_warning) for agent in self.agents]): return False if any([not task.is_valid(show_warning=show_warning) for task in self.tasks]): return False if self.process == Process.hierarchical and not (self.manager_llm or self.manager_agent): if show_warning: st.warning(f"Crew {self.name} has no manager agent or manager llm set for hierarchical process") return False return True def validate_manager_llm(self): available_models = llm_providers_and_models() if self.manager_llm and self.manager_llm not in available_models: self.manager_llm = None def draw(self,expanded=False, buttons=True): self.validate_manager_llm() name_key = f"name_{self.id}" process_key = f"process_{self.id}" verbose_key = f"verbose_{self.id}" agents_key = f"agents_{self.id}" tasks_key = f"tasks_{self.id}" manager_llm_key = f"manager_llm_{self.id}" manager_agent_key = f"manager_agent_{self.id}" memory_key = f"memory_{self.id}" planning_key = f"planning_{self.id}" cache_key = f"cache_{self.id}" max_rpm_key = f"max_rpm_{self.id}" if self.edit: with st.container(border=True): st.text_input("Name (just id, it doesn't affect anything)", value=self.name, key=name_key, on_change=self.update_name) st.selectbox("Process", options=[Process.sequential, Process.hierarchical], index=[Process.sequential, Process.hierarchical].index(self.process), key=process_key, on_change=self.update_process) st.multiselect("Agents", options=[agent.role for agent in ss.agents], default=[agent.role for agent in self.agents], key=agents_key, on_change=self.update_agents) # Filter tasks by selected agents available_tasks = [task for task in ss.tasks if task.agent and task.agent.id in [agent.id for agent in self.agents]] available_task_ids = [task.id for task in available_tasks] default_task_ids = [task.id for task in self.tasks if task.id in available_task_ids] st.multiselect("Tasks", options=available_task_ids, default=default_task_ids, format_func=lambda x: next(task.description for task in ss.tasks if task.id == x), key=tasks_key, on_change=self.update_tasks) st.selectbox("Manager LLM", options=["None"] + llm_providers_and_models(), index=0 if self.manager_llm is None else llm_providers_and_models().index(self.manager_llm) + 1, key=manager_llm_key, on_change=self.update_manager_llm, disabled=(self.process != Process.hierarchical)) st.selectbox("Manager Agent", options=["None"] + [agent.role for agent in ss.agents], index=0 if self.manager_agent is None else [agent.role for agent in ss.agents].index(self.manager_agent.role) + 1, key=manager_agent_key, on_change=self.update_manager_agent, disabled=(self.process != Process.hierarchical)) st.checkbox("Verbose", value=self.verbose, key=verbose_key, on_change=self.update_verbose) st.checkbox("Memory", value=self.memory, key=memory_key, on_change=self.update_memory) st.checkbox("Cache", value=self.cache, key=cache_key, on_change=self.update_cache) st.checkbox("Planning", value=self.planning, key=planning_key, on_change=self.update_planning) st.number_input("Max req/min", value=self.max_rpm, key=max_rpm_key, on_change=self.update_max_rpm) st.button("Save", on_click=self.set_editable, args=(False,), key=rnd_id()) else: fix_columns_width() expander_title = f"Crew: {self.name}" if self.is_valid() else f"❗ Crew: {self.name}" with st.expander(expander_title, expanded=expanded): st.markdown(f"**Process:** {self.process}") if self.process == Process.hierarchical: st.markdown(f"**Manager LLM:** {self.manager_llm}") st.markdown(f"**Manager Agent:** {self.manager_agent.role if self.manager_agent else 'None'}") st.markdown(f"**Verbose:** {self.verbose}") st.markdown(f"**Memory:** {self.memory}") st.markdown(f"**Cache:** {self.cache}") st.markdown(f"**Planning:** {self.planning}") st.markdown(f"**Max req/min:** {self.max_rpm}") st.markdown("**Tasks:**") for i, task in enumerate([task for task in self.tasks if task.agent and task.agent.id in [agent.id for agent in self.agents]], 1): with st.container(border=True): async_tag = "(async)" if task.async_execution else "" st.markdown(f"**{i}.{async_tag} {task.description}**") st.markdown(f"**Agent:** {task.agent.role if task.agent else 'None'}") tools_list = ", ".join([tool.name for tool in task.agent.tools]) if task.agent else "None" st.markdown(f" **Tools:** {tools_list}") st.markdown(f" **LLM:** {task.agent.llm_provider_model}") if buttons: col1, col2 = st.columns(2) with col1: st.button("Edit", on_click=self.set_editable, key=rnd_id(), args=(True,)) with col2: st.button("Delete", on_click=self.delete, key=rnd_id()) self.is_valid(show_warning=True) def set_editable(self, edit): self.edit = edit db_utils.save_crew(self)