CSV load script template

The following script is an example of using the load function to import data from a CSV file into a database. It shows how a transform function can perform common parsing tasks such as renaming columns and converting timestamps into datetime objects. The database has a CHECK constraint that rejects any rows with an ID divisible by 1000. An example on_error function prints the IDs of rows that fail to insert.

"""
Script to create database and load observations data from csv file. It also
demonstrates how an `on_error` function can handle failed rows.

Generate observations.csv with:
curl 'https://sensors.bgs.ac.uk/FROST-Server/v1.1/Observations?$select=@iot.id,result,phenomenonTime&$top=20000&$resultFormat=csv' -o observations.csv  # noqa
"""
import csv
import sqlite3
import datetime as dt
from typing import Iterable

import etlhelper as etl


def load_observations(csv_file: str, conn: sqlite3.Connection) -> None:
    """Load observations from csv_file to db_file."""
    # Drop table (helps with repeated test runs!)
    drop_table_sql = """
        DROP TABLE IF EXISTS observations
        """
    etl.execute(drop_table_sql, conn)

    # Create table (reject ids with no remainder when divided by 1000)
    create_table_sql = """
        CREATE TABLE IF NOT EXISTS observations (
            id INTEGER PRIMARY KEY CHECK (id % 1000),
            time TIMESTAMP,
            result FLOAT
            )"""
    etl.execute(create_table_sql, conn)

    # Load data
    with open(csv_file, "rt") as f:
        reader = csv.DictReader(f)
        etl.load("observations", conn, reader, transform=transform, on_error=on_error)


# A transform function that takes an iterable of rows and returns an iterable
# of rows.
def transform(rows: Iterable[dict]) -> Iterable[dict]:
    """Rename time column and convert to Python datetime."""
    for row in rows:
        # Dictionaries are mutable, so rows can be modified in place.
        time_value = row.pop("phenomenonTime")
        row["time"] = dt.datetime.strptime(time_value, "%Y-%m-%dT%H:%M:%S.%fZ")

    return rows


# The on_error function is called after each chunk with all the failed rows
def on_error(failed_rows: list[tuple[dict, Exception]]) -> None:
    """Print the IDs of failed rows"""
    rows, exceptions = zip(*failed_rows)
    failed_ids = [row["id"] for row in rows]
    print(f"Failed IDs: {failed_ids}")


if __name__ == "__main__":
    etl.log_to_console()

    db = etl.DbParams(dbtype="SQLITE", filename="observations.sqlite")
    with db.connect() as conn:
        load_observations("observations.csv", conn)

Export data to CSV

The Pandas library can connect to databases via SQLAlchemy. It has powerful tools for manipulating tabular data. ETL Helper makes it easy to prepare the SQL Alchemy connection.

"""ETL Helper script to demonstrate compatibility when creating an SQLAlchemy connection."""
import pandas as pd
from sqlalchemy import create_engine

from my_databases import ORACLEDB

engine = create_engine(ORACLEDB.get_sqlalchemy_connection_string("ORACLE_PASSWORD"))

sql = "SELECT * FROM my_table"
df = pd.read_sql(sql, engine)
df.to_csv("my_data.csv", header=True, index=False, float_format="%.3f")