Spaces:
Sleeping
Sleeping
File size: 11,612 Bytes
58e0d8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
import os
from typing import Optional, Dict, Any, List, Type
from crewai_tools import BaseTool
import requests
import importlib.util
from pydantic.v1 import BaseModel, Field,root_validator, ValidationError
import docker
import base64
class FixedCustomFileWriteToolInputSchema(BaseModel):
content: str = Field(..., description="The content to write or append to the file")
mode: str = Field(..., description="Mode to open the file in, either 'w' or 'a'")
class CustomFileWriteToolInputSchema(FixedCustomFileWriteToolInputSchema):
content: str = Field(..., description="The content to write or append to the file")
mode: str = Field(..., description="Mode to open the file in, either 'w' or 'a'")
filename: str = Field(..., description="The name of the file to write to or append")
class CustomFileWriteTool(BaseTool):
name: str = "Write File"
description: str = "Tool to write or append to files"
args_schema = CustomFileWriteToolInputSchema
filename: Optional[str] = None
def __init__(self, base_folder: str, filename: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if filename is not None and len(filename) > 0:
self.args_schema = FixedCustomFileWriteToolInputSchema
self._base_folder = base_folder
self.filename = filename or None
self._ensure_base_folder_exists()
self._generate_description()
def _ensure_base_folder_exists(self):
os.makedirs(self._base_folder, exist_ok=True)
def _get_full_path(self, filename: Optional[str]) -> str:
if filename is None and self.filename is None:
raise ValueError("No filename specified and no default file set.")
chosen_file = filename or self.filename
full_path = os.path.abspath(os.path.join(self._base_folder, chosen_file))
if not full_path.startswith(os.path.abspath(self._base_folder)):
raise ValueError("Access outside the base directory is not allowed.") #TODO: add validations for path traversal
return full_path
def _run(self, content: str, mode: str, filename: Optional[str] = None) -> Dict[str, Any]:
full_path = self._get_full_path(filename)
try:
with open(full_path, 'a' if mode == 'a' else 'w') as file:
file.write(content)
return {
"status": "success",
"message": f"Content successfully {'appended to' if mode == 'a' else 'written to'} {full_path}"
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
def run(self, input_data: CustomFileWriteToolInputSchema) -> Any:
response_data = self._run(
content=input_data.content,
mode=input_data.mode,
filename=input_data.filename
)
return response_data
class CustomApiToolInputSchema(BaseModel):
endpoint: str = Field(..., description="The specific endpoint for the API call")
method: str = Field(..., description="HTTP method to use (GET, POST, PUT, DELETE)")
headers: Optional[Dict[str, str]] = Field(None, description="HTTP headers to include in the request")
query_params: Optional[Dict[str, Any]] = Field(None, description="Query parameters for the request")
body: Optional[Dict[str, Any]] = Field(None, description="Body of the request for POST/PUT methods")
class CustomApiTool(BaseTool):
name: str = "Call Api"
description: str = "Tool to make API calls with customizable parameters"
args_schema = CustomApiToolInputSchema
base_url: Optional[str] = None
default_headers: Optional[Dict[str, str]] = None
default_query_params: Optional[Dict[str, Any]] = None
def __init__(self, base_url: Optional[str] = None, headers: Optional[Dict[str, str]] = None, query_params: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(**kwargs)
self.base_url = base_url
self.default_headers = headers or {}
self.default_query_params = query_params or {}
self._generate_description()
def _run(self, endpoint: str, method: str, headers: Optional[Dict[str, str]] = None, query_params: Optional[Dict[str, Any]] = None, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint}".rstrip("/")
headers = {**self.default_headers, **(headers or {})}
query_params = {**self.default_query_params, **(query_params or {})}
try:
response = requests.request(
method=method.upper(),
url=url,
headers=headers,
params=query_params,
json=body,
verify=False #TODO: add option to disable SSL verification
)
return {
"status_code": response.status_code,
"response": response.json() if response.headers.get("Content-Type") == "application/json" else response.text
}
except Exception as e:
return {
"status_code": 500,
"response": str(e)
}
def run(self, input_data: CustomApiToolInputSchema) -> Any:
response_data = self._run(
endpoint=input_data.endpoint,
method=input_data.method,
headers=input_data.headers,
query_params=input_data.query_params,
body=input_data.body
)
return response_data
class CustomCodeInterpreterSchema(BaseModel):
"""Input for CustomCodeInterpreterTool."""
code: Optional[str] = Field(
None,
description="Python3 code used to be interpreted in the Docker container. ALWAYS PRINT the final result and the output of the code",
)
run_script: Optional[str] = Field(
None,
description="Relative path to the script to run in the Docker container. The script should contain the code to be executed.",
)
libraries_used: str = Field(
...,
description="List of libraries used in the code with proper installing names separated by commas. Example: numpy,pandas,beautifulsoup4",
)
@root_validator
def check_code_or_run_script(cls, values):
code = values.get('code')
run_script = values.get('run_script')
if not code and not run_script:
raise ValueError('Either code or run_script must be provided')
if code and run_script:
raise ValueError('Only one of code or run_script should be provided')
return values
class CustomCodeInterpreterTool(BaseTool):
name: str = "Code Interpreter"
description: str = "Interprets Python3 code strings with a final print statement. Requires eighter code or run_script to be provided."
args_schema: Type[BaseModel] = CustomCodeInterpreterSchema
code: Optional[str] = None
run_script: Optional[str] = None
workspace_dir: Optional[str] = None
def __init__(self, workspace_dir: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if workspace_dir is not None and len(workspace_dir) > 0:
self.workspace_dir = os.path.abspath(workspace_dir)
os.makedirs(self.workspace_dir, exist_ok=True)
self._generate_description()
@staticmethod
def _get_installed_package_path():
spec = importlib.util.find_spec('crewai_tools')
return os.path.dirname(spec.origin)
def _verify_docker_image(self) -> None:
"""
Verify if the Docker image is available
"""
image_tag = "code-interpreter:latest"
client = docker.from_env()
try:
client.images.get(image_tag)
except docker.errors.ImageNotFound:
package_path = self._get_installed_package_path()
dockerfile_path = os.path.join(package_path, "tools/code_interpreter_tool")
if not os.path.exists(dockerfile_path):
raise FileNotFoundError(f"Dockerfile not found in {dockerfile_path}")
client.images.build(
path=dockerfile_path,
tag=image_tag,
rm=True,
)
def _install_libraries(
self, container: docker.models.containers.Container, libraries: str
) -> None:
"""
Install missing libraries in the Docker container
"""
if libraries and len(libraries) > 0:
for library in libraries.split(","):
print(f"Installing library: {library}")
install_result = container.exec_run(f"pip install {library}")
if install_result.exit_code != 0:
print(f"Something went wrong while installing the library: {library}")
print(install_result.output.decode("utf-8"))
def _get_existing_container(self, container_name: str) -> Optional[docker.models.containers.Container]:
client = docker.from_env()
try:
existing_container = client.containers.get(container_name)
if existing_container.status == 'running':
return existing_container
if existing_container.status == 'exited':
existing_container.remove()
except docker.errors.NotFound:
pass
return None
def _init_docker_container(self) -> docker.models.containers.Container:
client = docker.from_env()
volumes = {}
if self.workspace_dir:
volumes[self.workspace_dir] = {"bind": "/workspace", "mode": "rw"}
container_name = "custom-code-interpreter"
existing_container = self._get_existing_container(container_name)
if existing_container:
return existing_container
return client.containers.run(
"code-interpreter", detach=True, tty=True, working_dir="/workspace", name=container_name, volumes=volumes
)
def run_code_in_docker(self, code: str, libraries_used: str) -> str:
self._verify_docker_image()
container = self._init_docker_container()
self._install_libraries(container, libraries_used)
# Encode the code to base64
encoded_code = base64.b64encode(code.encode('utf-8')).decode('utf-8')
# Create a command to decode the base64 string and run the Python code
cmd_to_run = f'python3 -c "import base64; exec(base64.b64decode(\'{encoded_code}\').decode(\'utf-8\'))"'
print(f"Running code in container: \n{code}")
exec_result = container.exec_run(cmd_to_run)
if exec_result.exit_code != 0:
print(f"Something went wrong while running the code: \n{exec_result.output.decode('utf-8')}")
return f"Something went wrong while running the code: \n{exec_result.output.decode('utf-8')}"
print(f"Code run output: \n{exec_result.output.decode('utf-8')}")
return exec_result.output.decode("utf-8")
def _run_script(self, run_script: str,libraries_used: str) -> str:
with open(f"{self.workspace_dir}/{run_script}", "r") as file:
code = file.read()
return self.run_code_in_docker(code, libraries_used)
def _run(self, **kwargs) -> str:
code = kwargs.get("code", self.code)
run_script = kwargs.get("run_script", self.run_script)
libraries_used = kwargs.get("libraries_used", [])
if run_script:
return self._run_script(run_script, libraries_used)
return self.run_code_in_docker(code, libraries_used)
|