Spaces:
Sleeping
Sleeping
from typing import Annotated | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from fastapi.encoders import jsonable_encoder | |
from fastapi.exceptions import RequestValidationError | |
from starlette.middleware.cors import CORSMiddleware | |
from fastapi import FastAPI, Header, UploadFile, Depends, HTTPException, status | |
import base64 | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from starlette.responses import JSONResponse | |
from collections import defaultdict | |
from pydantic import BaseModel | |
from threading import Lock | |
from logger import get_now | |
from run import main as run_main | |
START_AT = get_now() | |
app = FastAPI() | |
lock = Lock() | |
n_run = 0 | |
last_run = None | |
is_running=False | |
def scheduled_job(): | |
with lock: | |
global is_running | |
if is_running: | |
return False | |
is_running = True | |
print("Job is running!") | |
run_main() | |
with lock: | |
global n_run | |
n_run = n_run + 1 | |
global last_run | |
last_run = get_now() | |
is_running = False | |
return True | |
# Create a scheduler | |
scheduler = BackgroundScheduler() | |
# Add the scheduled job to the scheduler | |
scheduler.add_job(scheduled_job, 'interval', minutes=30) | |
# Start the scheduler | |
scheduler.start() | |
# You can also stop the scheduler when the FastAPI application shuts down | |
def shutdown_event(): | |
scheduler.shutdown() | |
class BaseResponse(BaseModel): | |
status: int = 1 | |
message: str = "" | |
result: object = None | |
async def http_exception_handler(request, exc: HTTPException): | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content=jsonable_encoder(BaseResponse(status=0, message=exc.detail)) | |
) | |
def validation_exception_handler(request, exc: RequestValidationError) -> JSONResponse: | |
reformatted_message = defaultdict(list) | |
for pydantic_error in exc.errors(): | |
loc, msg = pydantic_error["loc"], pydantic_error["msg"] | |
filtered_loc = loc[1:] if loc[0] in ("body", "query", "path") else loc | |
field_string = ".".join(filtered_loc) | |
reformatted_message[field_string].append(msg) | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content=jsonable_encoder(BaseResponse(status=0, message="Invalid request", result=reformatted_message)) | |
) | |
def status(): | |
return BaseResponse(result={ | |
"start_at": START_AT, | |
"current": get_now(), | |
"n_runs": n_run, | |
"last_run": last_run, | |
}) | |
def run_once(): | |
print("Running the job once.") | |
success = scheduled_job() # Manually trigger the job | |
if not success: | |
return BaseResponse(message="Job is running, not start a new job") | |
return BaseResponse(message="Job executed once.") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) |