derek-thomas's picture
derek-thomas HF staff
Initializing cols
1d46c26
raw
history blame
6.58 kB
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
import pandas as pd
import requests
from my_logger import setup_logger
logger = setup_logger(__name__)
def get_pushshift_data(subreddit: str, before: Optional[int] = None,
after: Optional[int] = None, aggs: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Fetch data from the Pushshift API for the specified subreddit.
:param subreddit: The name of the subreddit to scrape.
:param before: The upper limit for the created_utc attribute of the submissions.
:param after: The lower limit for the created_utc attribute of the submissions.
:param aggs: The aggregation summary option to use.
:return: A dictionary containing the fetched data and aggregations if available.
"""
url = "https://api.pushshift.io/reddit/search/submission/"
params = {
"subreddit": subreddit,
"size": 1000,
"sort": "created_utc",
"sort_type": "desc",
}
if before is not None:
params["before"] = before
if after is not None:
params["after"] = after
if aggs is not None:
params["aggs"] = aggs
response = requests.get(url, params=params)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error fetching data: {response.status_code}")
return None
def get_post_count_for_day(subreddit: str, day_to_scrape: str) -> int:
"""
Get the total number of posts for a specific day in the specified subreddit using the Pushshift API.
:param subreddit: The name of the subreddit to get the post count for.
:param day_to_scrape: The date for which to get the post count (format: "YYYY-MM-DD").
:return: The total number of posts for the specified day.
"""
date_obj = datetime.strptime(day_to_scrape, "%Y-%m-%d")
after = int(date_obj.timestamp())
before = int((date_obj + timedelta(days=1)).timestamp())
response = get_pushshift_data(subreddit, before=before, after=after, aggs="created_utc")
if response is not None:
aggs = response.get("aggs", {}).get("created_utc", [])
if aggs:
return aggs[0]["doc_count"]
return 0
def fetch_data(subreddit: str, before: int, after: int) -> Optional[Dict[str, Any]]:
url = "https://api.pushshift.io/reddit/search/submission/"
params = {
"subreddit": subreddit,
"size": 1000,
"sort": "created_utc",
"sort_type": "desc",
"before": before,
"after": after,
}
response = requests.get(url, params=params)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Error fetching data: {response.status_code}")
return None
def convert_timestamp_to_datetime(timestamp: int) -> str:
# Convert the timestamp to a datetime object
datetime_obj = datetime.utcfromtimestamp(timestamp)
# Add timezone information
datetime_obj_utc = datetime_obj.replace(tzinfo=timezone.utc)
# Convert the datetime object to a formatted string
datetime_str = datetime_obj_utc.strftime('%Y-%m-%d %H:%M:%S')
return datetime_str
def scrape_submissions_by_day(subreddit_to_scrape: str, day_to_scrape: str) -> List[Dict[str, Any]]:
start_time = time.time()
scraped_submissions = []
date_obj = datetime.strptime(day_to_scrape, "%Y-%m-%d")
if date_obj > datetime.now() - timedelta(days=7):
logger.error("The specified date might not be available in the Pushshift API yet. "
"Please try an earlier date or wait for the API to be updated.")
return scraped_submissions
after = int(date_obj.timestamp())
before = int((date_obj + timedelta(days=1)).timestamp())
# todo get_post_count_for_day didnt seem to work
# post_count = get_post_count_for_day(subreddit_to_scrape, day_to_scrape)
# total_requests = (post_count + 99) // 100 # Estimate the total number of requests
actual_requests = 0
while after < before:
after_str, before_str = convert_timestamp_to_datetime(after), convert_timestamp_to_datetime(before)
logger.debug(f"Fetching data between timestamps {after_str} and {before_str}")
data = get_pushshift_data(subreddit_to_scrape, before=before, after=after)
if data is None or len(data["data"]) == 0:
break
scraped_submissions.extend(data["data"])
before = data["data"][-1]["created_utc"]
actual_requests += 1
time.sleep(1)
elapsed_time = time.time() - start_time
if actual_requests:
logger.info(
f"{actual_requests}it [{elapsed_time // 60:02}:{elapsed_time % 60:.2f} {elapsed_time / actual_requests:.2f}s/it]")
logger.info(
f"Finished scraping {len(scraped_submissions)} submissions in {elapsed_time:.2f} seconds in {actual_requests} requests")
return scraped_submissions
def submissions_to_dataframe(submissions: List[Dict[str, Any]]) -> pd.DataFrame:
"""
Parse a list of submissions into a pandas DataFrame.
:param submissions: A list of dictionaries containing the scraped submission data.
:return: A pandas DataFrame containing the submission data.
"""
cols = ['score', 'num_comments', 'title', 'permalink', 'selftext', 'url', 'created_utc', 'author', 'id',
'downs', 'ups']
df = pd.DataFrame(submissions)
df = df.convert_dtypes()
# As of Jan 2017 Im getting an error:
# KeyError: "['downs', 'ups'] not in index"
# To maintain backwards compatibility I will initialize these cols
for col in cols:
if col not in df.columns:
df[col] = None
# Take the subset of columns
df = df[cols]
# Convert the "created_utc" column to a datetime column with timezone information
df['created_utc'] = pd.to_datetime(df['created_utc'], unit='s').dt.tz_localize('UTC')
# Using native type date and time had some incompatibility with the datasets visualization widget
df['date'] = df['created_utc'].dt.date.astype(str)
df['time'] = df['created_utc'].dt.time.astype(str)
return df
if __name__ == '__main__':
subreddit_to_scrape = "askreddit"
day_to_scrape = "2013-03-01"
submissions = scrape_submissions_by_day(subreddit_to_scrape, day_to_scrape)
df = submissions_to_dataframe(submissions)
print(df.head().to_string())
logger.info(f"Scraped {len(submissions)} submissions from r/{subreddit_to_scrape} on {day_to_scrape}")