radames's picture
controlnet
ff9325e
raw
history blame
727 Bytes
from typing import Dict, Union
from uuid import UUID
import asyncio
from PIL import Image
from typing import Dict, Union
from PIL import Image
InputParams = dict
UserId = UUID
EventDataContent = Dict[str, InputParams]
class UserDataEvent:
def __init__(self):
self.data_event = asyncio.Event()
self.data_content: EventDataContent = {}
def update_data(self, new_data: EventDataContent):
self.data_content = new_data
self.data_event.set()
async def wait_for_data(self) -> EventDataContent:
await self.data_event.wait()
self.data_event.clear()
return self.data_content
UserDataEventMap = Dict[UserId, UserDataEvent]
user_data_events: UserDataEventMap = {}