|
import os |
|
import time |
|
from datetime import datetime, timedelta |
|
|
|
import pandas as pd |
|
from datasets import Dataset, DatasetDict, load_dataset |
|
from huggingface_hub import login |
|
|
|
from my_logger import setup_logger |
|
from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/dataset-creator-{subreddit}" |
|
dataset_readme_path = "README.md" |
|
|
|
|
|
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] |
|
login(auth_token, add_to_git_credential=True) |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
def update_readme(dataset_name, subreddit, date_to_fetch): |
|
readme_text = f""" |
|
# {dataset_name} |
|
|
|
## Dataset Overview |
|
The goal is to have an open dataset of `{subreddit}` submissions. This has been taken from the Pushshift API. |
|
|
|
## Data Collection |
|
This has been collected with sequential calls that follow the pagination of the pushshift request. |
|
|
|
|
|
## Data Structure |
|
- `all_days`: All the data after `{os.environ["START_DATE"]}` |
|
|
|
## Update Frequency |
|
The dataset is updated daily and covers the period from `{os.environ["START_DATE"]}` to two days ago. |
|
|
|
## Attribution |
|
Data sourced from the Pushshift API. |
|
|
|
## Change Log |
|
<details> |
|
<summary>Click to expand</summary> |
|
|
|
- **{datetime.now().strftime('%Y-%m-%d')}:** Added data for {date_to_fetch} to the 'all_days' split and saved as CSV |
|
|
|
</details> |
|
""" |
|
|
|
return readme_text |
|
|
|
|
|
def main(date_to_fetch): |
|
""" |
|
Runs the main data processing function to fetch and process subreddit data for the specified date. |
|
|
|
Args: |
|
date_to_fetch (str): The date to fetch subreddit data for, in the format "YYYY-MM-DD". |
|
|
|
Returns: |
|
most_recent_date (str): Most recent date in dataset |
|
""" |
|
|
|
|
|
try: |
|
dataset = load_dataset(dataset_name) |
|
logger.info("Loading existing dataset") |
|
if "__index_level_0__" in dataset["all_days"].column_names: |
|
dataset = dataset.remove_columns(["__index_level_0__"]) |
|
except FileNotFoundError: |
|
logger.info("Creating new dataset") |
|
dataset = DatasetDict() |
|
|
|
|
|
logger.info(f"Fetching data for {date_to_fetch}") |
|
submissions = scrape_submissions_by_day(subreddit, date_to_fetch) |
|
df = submissions_to_dataframe(submissions) |
|
logger.info(f"Data fetched for {date_to_fetch}") |
|
most_recent_date = datetime.strptime(date_to_fetch, '%Y-%m-%d').date() |
|
|
|
|
|
if "all_days" in dataset: |
|
logger.info("Appending data to split 'all_days'") |
|
|
|
old_data = dataset['all_days'].to_pandas() |
|
new_data = pd.concat([old_data, df], ignore_index=True) |
|
|
|
|
|
new_data = new_data.drop_duplicates(subset=['id'], keep="first") |
|
new_data_most_recent_date_raw = new_data['created_utc'].max() |
|
new_data_most_recent_date_dt = datetime.strptime(new_data_most_recent_date_raw.split(' ')[0], '%Y-%m-%d').date() |
|
|
|
most_recent_date = max(new_data_most_recent_date_dt - timedelta(days=1), most_recent_date) |
|
|
|
|
|
dataset["all_days"] = Dataset.from_pandas(new_data) |
|
else: |
|
logger.info("Creating new split 'all_days'") |
|
dataset["all_days"] = Dataset.from_pandas(df) |
|
|
|
logger.info("Appended or created split 'all_days'") |
|
|
|
|
|
logger.info(f"Pushing data for {date_to_fetch} to the Hugging Face hub") |
|
readme_text = update_readme(dataset_name, subreddit, date_to_fetch) |
|
dataset.description = readme_text |
|
dataset.push_to_hub(dataset_name, token=auth_token) |
|
logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub") |
|
return most_recent_date |
|
|
|
|
|
def run_main_continuously(): |
|
""" |
|
This function runs the given `main_function` continuously, starting from the date specified |
|
in the environment variable "START_DATE" until two days ago. Once it reaches two days ago, |
|
it will wait until tomorrow to start again at the same time as when it started today. |
|
""" |
|
start_date_str = os.environ.get("START_DATE") |
|
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date() |
|
|
|
|
|
start_time = datetime.now().time() |
|
|
|
while True: |
|
today = datetime.now().date() |
|
two_days_ago = today - timedelta(days=2) |
|
|
|
if start_date <= two_days_ago: |
|
logger.info(f"Running main function for date: {start_date}") |
|
most_recent_date = main(str(start_date)) |
|
start_date = most_recent_date + timedelta(days=1) |
|
else: |
|
tomorrow = today + timedelta(days=1) |
|
now = datetime.now() |
|
start_of_tomorrow = datetime.combine(tomorrow, start_time) |
|
wait_until_tomorrow = (start_of_tomorrow - now).total_seconds() |
|
logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds") |
|
time.sleep(wait_until_tomorrow) |
|
|
|
|
|
if __name__ == '__main__': |
|
run_main_continuously() |
|
|