↑ Up | ← Previous | Next →
We will now create a Jupyter Notebook notebook.ipynb file which we will use to read a CSV file and export it to Postgres.
Install Jupyter:
uv add --dev jupyterLet's create a Jupyter notebook to explore the data:
uv run jupyter notebookWe will use data from the NYC TLC Trip Record Data website.
Specifically, we will use the Yellow taxi trip records CSV file for January 2021.
This data used to be csv, but later they switched to parquet. We want to keep using CSV because we need to do a bit of extra pre-processing (for the purposes of learning it).
A dictionary to understand each field is available here.
Note: The CSV data is stored as gzipped files. Pandas can read them directly.
Create a new notebook and run:
import pandas as pd
# Read a sample of the data
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/'
df = pd.read_csv(prefix + 'yellow_tripdata_2021-01.csv.gz', nrows=100)
# Display first rows
df.head()
# Check data types
df.dtypes
# Check data shape
df.shape- When using
nrows=100to sample the first 100 rows, all columns have consistent data types in this subset, so theDtypeWarningbelow will NOT appear. - To reproduce the type warning shown below, remove the
nrows=100parameter and read the full dataset.
We have a warning: (Note that this warning might pop up later for some users, so it's best to follow the instructions below)
/tmp/ipykernel_25483/2933316018.py:1: DtypeWarning: Columns (6) have mixed types. Specify dtype option on import or set low_memory=False.
So we need to specify the types:
dtype = {
"VendorID": "Int64",
"passenger_count": "Int64",
"trip_distance": "float64",
"RatecodeID": "Int64",
"store_and_fwd_flag": "string",
"PULocationID": "Int64",
"DOLocationID": "Int64",
"payment_type": "Int64",
"fare_amount": "float64",
"extra": "float64",
"mta_tax": "float64",
"tip_amount": "float64",
"tolls_amount": "float64",
"improvement_surcharge": "float64",
"total_amount": "float64",
"congestion_surcharge": "float64"
}
parse_dates = [
"tpep_pickup_datetime",
"tpep_dropoff_datetime"
]
df = pd.read_csv(
prefix + 'yellow_tripdata_2021-01.csv.gz',
nrows=100,
dtype=dtype,
parse_dates=parse_dates
)In the Jupyter notebook, we create code to:
- Download the CSV file
- Read it in chunks with pandas
- Convert datetime columns
- Insert data into PostgreSQL using SQLAlchemy
uv add sqlalchemy "psycopg[binary,pool]"from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg://root:root@localhost:5432/ny_taxi')print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))Output:
CREATE TABLE yellow_taxi_data (
"VendorID" BIGINT,
tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE,
tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE,
passenger_count BIGINT,
trip_distance FLOAT(53),
"RatecodeID" BIGINT,
store_and_fwd_flag TEXT,
"PULocationID" BIGINT,
"DOLocationID" BIGINT,
payment_type BIGINT,
fare_amount FLOAT(53),
extra FLOAT(53),
mta_tax FLOAT(53),
tip_amount FLOAT(53),
tolls_amount FLOAT(53),
improvement_surcharge FLOAT(53),
total_amount FLOAT(53),
congestion_surcharge FLOAT(53)
)df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')head(n=0) makes sure we only create the table, we don't add any data yet.
We don't want to insert all the data at once. Let's do it in batches and use an iterator for that:
df_iter = pd.read_csv(
prefix + 'yellow_tripdata_2021-01.csv.gz',
dtype=dtype,
parse_dates=parse_dates,
iterator=True,
chunksize=100000
)for df_chunk in df_iter:
print(len(df_chunk))df_chunk.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')first = True
for df_chunk in df_iter:
if first:
# Create table schema (no data)
df_chunk.head(0).to_sql(
name="yellow_taxi_data",
con=engine,
if_exists="replace"
)
first = False
print("Table created")
# Insert chunk
df_chunk.to_sql(
name="yellow_taxi_data",
con=engine,
if_exists="append"
)
print("Inserted:", len(df_chunk))first_chunk = next(df_iter)
first_chunk.head(0).to_sql(
name="yellow_taxi_data",
con=engine,
if_exists="replace"
)
print("Table created")
first_chunk.to_sql(
name="yellow_taxi_data",
con=engine,
if_exists="append"
)
print("Inserted first chunk:", len(first_chunk))
for df_chunk in df_iter:
df_chunk.to_sql(
name="yellow_taxi_data",
con=engine,
if_exists="append"
)
print("Inserted chunk:", len(df_chunk))Add tqdm to see progress:
uv add tqdmPut it around the iterable:
from tqdm.auto import tqdm
for df_chunk in tqdm(df_iter):
...To see progress in terms of total chunks, you would have to add the total argument to tqdm(df_iter). In our scenario, the pragmatic way is
to hardcode a value based on the number of entries in the table.
Connect to it using pgcli:
uv run pgcli -h localhost -p 5432 -u root -d ny_taxiAnd explore the data.
↑ Up | ← Previous | Next →