Welcome to ETL Helper’s documentation!

https://img.shields.io/pypi/v/etlhelper?label=Current%20Release https://img.shields.io/pypi/dm/etlhelper?label=Downloads%20pypi

ETL Helper is a Python ETL (Extract, Transform, Load) library to simplify data transfer into and out of databases.

ETL Helper makes it easy to run SQL queries via Python and return the results. It takes care of cursor management, importing drivers and formatting connection strings, while providing memory-efficient functions to read, write and transform data. This reduces the amount of boilerplate code required to manipulate data within relational databases with Python.

Features

  • fetchall, iter_rows, fetchone functions for querying databases

  • Data transfer uses memory-efficient generators (iter_chunks and iter_rows)

  • executemany, and load functions to insert or update data

  • copy_rows and copy_table_rows to transfer data between databases

  • User-defined transform functions transform data in-flight

  • execute function for one-off commands

  • Helpful error messages display the failed query SQL

  • on_error function to process rows that fail to insert

  • DbParams objects provide consistent way to connect to different database types (currently Oracle, PostgreSQL, SQLite and MS SQL Server)

  • Timestamped log messages for tracking long-running data transfers

  • Built upon the DBAPI2 specification for database drivers in Python

These tools can create easy-to-understand, lightweight, versionable and testable Extract-Transform-Load (ETL) workflows.

ETL Helper components

ETL Helper has three components.

The ETL functions are used to extract, transform and load rows of data from relational databases. They can be used with any DB API 2.0-compliant database connections. Logging and helpful error messages are provided out-of-the-box.

A DbParams class provides a convenient way to define database connections. For any given database system, it identifies the correct driver, the required parameters and defines connection strings. It provides convenience methods for checking databases are reachable over a network and for connecting to them.

The DbHelper classes work behind the scenes to smooth out inconsistencies between different database systems. They also apply database-specific optimisations e.g., using the faster executebatch function for PostgreSQL connections instead of executemany. In normal use, users do not interact with the DbHelper classes.

Quickstart examples

Loading data

The following script uses the execute, load and fetchall functions to create a database table and populate it with data.

"""ETL Helper script to load records to a database table."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

create_sql = """
    CREATE TABLE IF NOT EXISTS igneous_rock (
        id INTEGER PRIMARY KEY,
        name TEXT UNIQUE,
        grain_size TEXT
    )
"""

igneous_rocks = [
    {"name": "basalt", "grain_size": "fine"},
    {"name": "granite", "grain_size": "coarse"}
]


with sqlite3.connect(db_file) as conn:
    # Create table
    etl.execute(create_sql, conn)

    # Insert rows
    etl.load("igneous_rock", conn, igneous_rocks)

    # Confirm selection
    for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
        print(row)

The output is:

{'id': 1, 'name': 'basalt', 'grain_size': 'fine'}
{'id': 2, 'name': 'granite', 'grain_size': 'coarse'}

Copying data

This script copies data to another database, with transformation and logging.

"""ETL Helper script to copy records between databases."""
import sqlite3
import datetime as dt
from typing import Generator
import etlhelper as etl

igneous_db_file = "igneous_rocks.db"
rock_db_file = "rocks.db"

create_sql = """
    CREATE TABLE IF NOT EXISTS rock (
        id INTEGER PRIMARY KEY,
        name TEXT UNIQUE,
        category TEXT,
        last_update DATETIME
    )
"""
select_sql = "SELECT name FROM igneous_rock"


def transform(chunk: list[dict]) -> Generator[dict, None, None]:
    for row in chunk:
        row["category"] = "igneous"
        row["last_update"] = dt.datetime.now()
        yield row


etl.log_to_console()

with sqlite3.connect(igneous_db_file) as src:
    with sqlite3.connect(rock_db_file) as dest:
        # Create target table
        etl.execute(create_sql, dest)

        # Copy data
        rows = etl.iter_rows(select_sql, src, transform=transform)
        etl.load("rock", dest, rows)

        # Confirm transfer
        for row in etl.fetchall("SELECT * FROM rock", dest):
            print(row)

The output is:

# 2024-05-08 14:57:42,046 execute: Executing query
# 2024-05-08 14:57:42,053 iter_chunks: Fetching rows (chunk_size=5000)
# 2024-05-08 14:57:42,054 executemany: Executing many (chunk_size=5000)
# 2024-05-08 14:57:42,054 iter_chunks: All rows returned
# 2024-05-08 14:57:42,055 executemany: 2 rows processed (0 failed)
# 2024-05-08 14:57:42,057 executemany: 2 rows processed in total

{'id': 1, 'name': 'basalt', 'category': 'igneous', 'last_update': '2024-05-08 14:59:54.878726'}
{'id': 2, 'name': 'granite', 'category': 'igneous', 'last_update': '2024-05-08 14:59:54.879034'}

The Recipes section has more example code.