derek-thomas's picture
derek-thomas HF staff
Re-ordering logging
91a4dd8
raw
history blame
5.53 kB
import os
import time
from datetime import datetime, timedelta
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login
from my_logger import setup_logger
from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-{subreddit}"
dataset_readme_path = "README.md"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
def update_readme(dataset_name, subreddit, date_to_fetch):
readme_text = f"""
# {dataset_name}
## Dataset Overview
The goal is to have an open dataset of `{subreddit}` submissions. This has been taken from the Pushshift API.
## Data Collection
This has been collected with sequential calls that follow the pagination of the pushshift request.
## Data Structure
- `all_days`: All the data after `{os.environ["START_DATE"]}`
## Update Frequency
The dataset is updated daily and covers the period from `{os.environ["START_DATE"]}` to two days ago.
## Attribution
Data sourced from the Pushshift API.
## Change Log
<details>
<summary>Click to expand</summary>
- **{datetime.now().strftime('%Y-%m-%d')}:** Added data for {date_to_fetch} to the 'all_days' split and saved as CSV
</details>
"""
return readme_text
def main(date_to_fetch):
"""
Runs the main data processing function to fetch and process subreddit data for the specified date.
Args:
date_to_fetch (str): The date to fetch subreddit data for, in the format "YYYY-MM-DD".
Returns:
most_recent_date (str): Most recent date in dataset
"""
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name)
logger.info("Loading existing dataset")
if "__index_level_0__" in dataset["all_days"].column_names:
dataset = dataset.remove_columns(["__index_level_0__"])
except FileNotFoundError:
logger.info("Creating new dataset")
dataset = DatasetDict()
# Call get_subreddit_day with the calculated date
logger.info(f"Fetching data for {date_to_fetch}")
submissions = scrape_submissions_by_day(subreddit, date_to_fetch)
df = submissions_to_dataframe(submissions)
logger.info(f"Data fetched for {date_to_fetch}")
most_recent_date = datetime.strptime(date_to_fetch, '%Y-%m-%d').date()
# Append DataFrame to split 'all_days' or create new split
if "all_days" in dataset:
logger.info("Appending data to split 'all_days'")
# Merge the new submissions
old_data = dataset['all_days'].to_pandas()
new_data = pd.concat([old_data, df], ignore_index=True)
# Drop duplicates just in case
new_data = new_data.drop_duplicates(subset=['id'], keep="first")
new_data_most_recent_date_raw = new_data['created_utc'].max()
new_data_most_recent_date_dt = datetime.strptime(new_data_most_recent_date_raw.split(' ')[0], '%Y-%m-%d').date()
# Adding timedelta in case there is rounding error
most_recent_date = max(new_data_most_recent_date_dt - timedelta(days=1), most_recent_date)
# Convert back to dataset
dataset["all_days"] = Dataset.from_pandas(new_data)
else:
logger.info("Creating new split 'all_days'")
dataset["all_days"] = Dataset.from_pandas(df)
# Log appending or creating split 'all'
logger.info("Appended or created split 'all_days'")
# Push the augmented dataset to the Hugging Face hub
logger.info(f"Pushing data for {date_to_fetch} to the Hugging Face hub")
readme_text = update_readme(dataset_name, subreddit, date_to_fetch)
dataset.description = readme_text
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub")
return most_recent_date
def run_main_continuously():
"""
This function runs the given `main_function` continuously, starting from the date specified
in the environment variable "START_DATE" until two days ago. Once it reaches two days ago,
it will wait until tomorrow to start again at the same time as when it started today.
"""
start_date_str = os.environ.get("START_DATE")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
# Calculate the start time for running the main_function every day.
start_time = datetime.now().time()
while True:
today = datetime.now().date()
two_days_ago = today - timedelta(days=2)
if start_date <= two_days_ago:
logger.info(f"Running main function for date: {start_date}")
most_recent_date = main(str(start_date))
start_date = most_recent_date + timedelta(days=1)
else:
tomorrow = today + timedelta(days=1)
now = datetime.now()
start_of_tomorrow = datetime.combine(tomorrow, start_time)
wait_until_tomorrow = (start_of_tomorrow - now).total_seconds()
logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds")
time.sleep(wait_until_tomorrow)
if __name__ == '__main__':
run_main_continuously()