Working with the data#

Using Preprocessing Results stored in Parquet Files#

Preprocessing extracts main text and metadata from WARC content using resiliparse. The parquet files can then be subsequently used for a number of tasks. Below we provide some simple scripts for working with the data. Note that you could use a S3 file system package like ffsspec to work with minio buckets directly.

import os
import pandas as pd
import glob
import logging

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define the directory path
dir_path = "./parquet/"

# Getting all the parquet files from the directory
files = glob.glob(os.path.join(dir_path, "*.parquet"))

# Initialize counters and frequency distributions
total_unique_url = 0
total_unique_content_type = set()
total_unique_http_server = set()
total_unique_language = set()

# Initialize frequency dataframes
df_content_type_freq = None
df_http_server_freq = None
df_language_freq = None
df_url_freq = None
df_tld_freq = None

# 1. Read parquet files one by one, generate statistics, and aggregate them
for file in files:
    logging.info(f"Processing file: {file}")

    # Read parquet file
    data = pd.read_parquet(file)

    # Update url combination statistics
    unique_urls = data['url_subdomain'] + "." +data['url_domain'] + "." + data['url_suffix']
    data["unique_urls"] = data['url_subdomain'] + "." +data['url_domain'] + "." + data['url_suffix']
    total_unique_url += unique_urls.nunique()

    # Update 'http_content_type', 'http_server', and 'language' statistics
    total_unique_content_type.update(data['http_content_type'].unique())
    total_unique_http_server.update(data['http_server'].unique())
    total_unique_language.update(data['language'].unique())

    # Update frequency distributions
    df_tld_freq = df_tld_freq.add(data['url_suffix'].value_counts(), fill_value=0) if df_tld_freq is not None else data['unique_urls'].value_counts()
    df_url_freq = df_url_freq.add(data['unique_urls'].value_counts(), fill_value=0) if df_url_freq is not None else data['unique_urls'].value_counts()
    df_content_type_freq = df_content_type_freq.add(data['http_content_type'].value_counts(), fill_value=0) if df_content_type_freq is not None else data['http_content_type'].value_counts()
    df_http_server_freq = df_http_server_freq.add(data['http_server'].value_counts(), fill_value=0) if df_http_server_freq is not None else data['http_server'].value_counts()
    df_language_freq = df_language_freq.add(data['language'].value_counts(), fill_value=0) if df_language_freq is not None else data['language'].value_counts()

# Convert set lengths to DataFrame for easy writing to TSV
df_url_counts = pd.DataFrame({"unique_url_combination_counts": [total_unique_url]})
df_content_type_counts = pd.DataFrame({"unique_http_content_type_counts": [len(total_unique_content_type)]})
df_http_server_counts = pd.DataFrame({"unique_http_server_counts": [len(total_unique_http_server)]})
df_language_counts = pd.DataFrame({"unique_language_counts": [len(total_unique_language)]})

# Transpose frequency dataframes
df_content_type_freq = df_content_type_freq.transpose()
df_http_server_freq = df_http_server_freq.transpose()
df_language_freq = df_language_freq.transpose()

# 3. Write one TSV file per statistic into the same directory with a proper name
df_url_counts.to_csv(os.path.join(dir_path, "unique_url_counts.tsv"), sep="\t", index=False)
df_content_type_counts.to_csv(os.path.join(dir_path, "unique_content_type_counts.tsv"), sep="\t", index=False)
df_http_server_counts.to_csv(os.path.join(dir_path, "unique_http_server_counts.tsv"), sep="\t", index=False)
df_language_counts.to_csv(os.path.join(dir_path, "unique_language_counts.tsv"), sep="\t", index=False)

# Save frequency distributions to TSV files
df_url_freq.to_csv(os.path.join(dir_path, "url_frequency.tsv"), sep="\t")
df_tld_freq.to_csv(os.path.join(dir_path, "tld_frequency.tsv"), sep="\t")
df_content_type_freq.to_csv(os.path.join(dir_path, "content_type_frequency.tsv"), sep="\t")
df_http_server_freq.to_csv(os.path.join(dir_path, "http_server_frequency.tsv"), sep="\t")
df_language_freq.to_csv(os.path.join(dir_path, "language_frequency.tsv"), sep="\t")

logging.info("Processing completed. Results are saved as TSV files.")

Another option for working with parquet files would be to invoke different S3 Select commands. Note that minio does not implement a query engine, but usually iterates row wise through the parquet files and apply the specified query filter. So S3 Select commands can use significant resources.

For example, when using the minio-client (if enabled by the server):

mc sql --recursive --query "select count(*) from S3Object" <alias>/path/preprocessed/2023-10