from typing import Annotated from apscheduler.schedulers.background import BackgroundScheduler from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from starlette.middleware.cors import CORSMiddleware from fastapi import FastAPI, Header, UploadFile, Depends, HTTPException, status import base64 from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from starlette.responses import JSONResponse from collections import defaultdict from pydantic import BaseModel from threading import Lock from logger import get_now from run import main as run_main START_AT = get_now() app = FastAPI() lock = Lock() n_run = 0 last_run = None is_running=False def scheduled_job(): with lock: global is_running if is_running: return False is_running = True print("Job is running!") run_main() with lock: global n_run n_run = n_run + 1 global last_run last_run = get_now() is_running = False return True # Create a scheduler scheduler = BackgroundScheduler() # Add the scheduled job to the scheduler scheduler.add_job(scheduled_job, 'interval', minutes=30) # Start the scheduler scheduler.start() # You can also stop the scheduler when the FastAPI application shuts down @app.on_event("shutdown") def shutdown_event(): scheduler.shutdown() class BaseResponse(BaseModel): status: int = 1 message: str = "" result: object = None @app.exception_handler(HTTPException) async def http_exception_handler(request, exc: HTTPException): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content=jsonable_encoder(BaseResponse(status=0, message=exc.detail)) ) @app.exception_handler(RequestValidationError) def validation_exception_handler(request, exc: RequestValidationError) -> JSONResponse: reformatted_message = defaultdict(list) for pydantic_error in exc.errors(): loc, msg = pydantic_error["loc"], pydantic_error["msg"] filtered_loc = loc[1:] if loc[0] in ("body", "query", "path") else loc field_string = ".".join(filtered_loc) reformatted_message[field_string].append(msg) return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content=jsonable_encoder(BaseResponse(status=0, message="Invalid request", result=reformatted_message)) ) @app.get("/status", response_model=BaseResponse) def status(): return BaseResponse(result={ "start_at": START_AT, "current": get_now(), "n_runs": n_run, "last_run": last_run, }) @app.get("/run") def run_once(): print("Running the job once.") success = scheduled_job() # Manually trigger the job if not success: return BaseResponse(message="Job is running, not start a new job") return BaseResponse(message="Job executed once.") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )