|
import os |
|
import time |
|
from datetime import datetime |
|
|
|
import pandas as pd |
|
import schedule |
|
from datasets import Dataset |
|
|
|
from utilities.user_defined_functions import get_latest_data, merge_data, load_or_create_dataset, remove_filtered_rows |
|
from utilities.my_logger import setup_logger |
|
from utilities.readme_update import update_dataset_readme |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}" |
|
dataset_readme_path = "README.md" |
|
|
|
frequency = os.environ.get("FREQUENCY", '').lower() |
|
if frequency not in ["daily", "hourly"]: |
|
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") |
|
|
|
|
|
auth_token = os.environ["HF_TOKEN"] |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
def main(): |
|
date = datetime.now().strftime('%Y-%m-%d') |
|
logger.warning(f"Running main function for date: {date}") |
|
dataset = load_or_create_dataset() |
|
|
|
new_df = get_latest_data() |
|
|
|
|
|
if 'train' in dataset.keys(): |
|
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() |
|
df = merge_data(old_df=old_df, new_df=new_df) |
|
new_rows = len(df) - len(old_df) |
|
|
|
else: |
|
df = new_df |
|
df['new'] = True |
|
df['updated'] = False |
|
new_rows = len(new_df) |
|
df = remove_filtered_rows(df) |
|
dataset['train'] = Dataset.from_pandas(df, preserve_index=False) |
|
|
|
|
|
logger.info(f"Adding {new_rows} rows for {date}.") |
|
|
|
|
|
logger.debug(f"Pushing data for {date} to the Hugging Face hub") |
|
dataset.push_to_hub(dataset_name, token=auth_token) |
|
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub") |
|
update_dataset_readme(dataset_name=dataset_name, subreddit=subreddit, new_rows=new_rows) |
|
logger.info(f"Updated README.") |
|
|
|
|
|
def schedule_periodic_task(): |
|
""" |
|
Schedule the main task to run at the user-defined frequency |
|
""" |
|
if frequency == 'hourly': |
|
logger.info(f'Scheduling tasks to run every hour at the top of the hour') |
|
schedule.every().hour.at(":00").do(main) |
|
elif frequency == 'daily': |
|
start_time = '05:00' |
|
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00') |
|
schedule.every().day.at(start_time).do(main) |
|
|
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
schedule_periodic_task() |
|
|