Spaces:
Sleeping
Sleeping
from google_sheet import run_stt_repo as _run_stt_repo, Converter | |
from logger import get_now | |
from dateutil import parser | |
import datetime as dt | |
class RunSttService: | |
def __init__(self, run_stt_repo): | |
self.run_stt_repo = run_stt_repo | |
def get_row(self): | |
config = self.run_stt_repo.read(2) | |
return config | |
def get_obj(self): | |
return Converter.convert_to_obj(self.get_row()) | |
def is_need_to_run(self, obj): | |
is_running = obj['is_running'] | |
if is_running: | |
return False | |
if obj['require_new_run']: | |
return True | |
last_run = parser.parse(obj['last_run']) | |
duration = obj['auto_run_after_last_run'] | |
if not last_run: | |
return True | |
if not duration: | |
return False | |
# print(duration) | |
if get_now() - last_run > dt.timedelta(**duration): | |
return True | |
return False | |
def get_const_cfg(self, const_keys=['require_new_run', 'auto_run_after_last_run']): | |
row = self.get_row() | |
row = {k: row[k] for k in row if k in const_keys} | |
return row | |
def set_is_running(self, obj): | |
obj['last_run'] = str(get_now()) | |
obj['is_running'] = True | |
obj = {**obj, **self.get_const_cfg()} | |
row = Converter.convert_to_row(obj) | |
print(row) | |
self.run_stt_repo.update(row_index=2, data=row) | |
def set_run_done(self): | |
obj = { | |
'is_running': False, | |
"require_new_run": False, | |
**self.get_const_cfg(['auto_run_after_last_run', 'last_run']) | |
} | |
row = Converter.convert_to_row(obj) | |
self.run_stt_repo.update(row_index=2, data=row) | |
run_stt_service = RunSttService(_run_stt_repo) | |
# if __name__ == '__main__': | |
# run_stt_service = RunSttService(_run_stt_repo) | |
# obj = run_stt_service.get_obj() | |
# print(run_stt_service.is_need_to_run(obj)) | |
# run_stt_service.set_is_running({}) | |
# import time | |
# time.sleep(10) | |
# run_stt_service.set_run_done() | |