Welcome to ETL Helper’s documentation!¶
ETL Helper is a Python ETL (Extract, Transform, Load) library to simplify data transfer into and out of databases.
ETL Helper makes it easy to run SQL queries via Python and return the results. It takes care of cursor management, importing drivers and formatting connection strings, while providing memory-efficient functions to read, write and transform data. This reduces the amount of boilerplate code required to manipulate data within relational databases with Python.
Features¶
fetchall
,iter_rows
,fetchone
functions for querying databasesData transfer uses memory-efficient generators (
iter_chunks
anditer_rows
)executemany
, andload
functions to insert or update datacopy_rows
andcopy_table_rows
to transfer data between databasesUser-defined transform functions transform data in-flight
execute
function for one-off commandsHelpful error messages display the failed query SQL
on_error
function to process rows that fail to insertDbParams
objects provide consistent way to connect to different database types (currently Oracle, PostgreSQL, SQLite and MS SQL Server)Timestamped log messages for tracking long-running data transfers
Built upon the DBAPI2 specification for database drivers in Python
These tools can create easy-to-understand, lightweight, versionable and testable Extract-Transform-Load (ETL) workflows.
ETL Helper components¶
ETL Helper has three components.
The ETL functions are used to extract, transform and load rows of data from relational databases. They can be used with any DB API 2.0-compliant database connections. Logging and helpful error messages are provided out-of-the-box.
A DbParams class provides a convenient way to define database connections. For any given database system, it identifies the correct driver, the required parameters and defines connection strings. It provides convenience methods for checking databases are reachable over a network and for connecting to them.
The DbHelper classes work behind the scenes to smooth out inconsistencies between different database systems.
They also apply database-specific optimisations e.g., using the faster executebatch
function for PostgreSQL connections instead of executemany
.
In normal use, users do not interact with the DbHelper classes.
Quickstart examples¶
Loading data¶
The following script uses the execute
, load
and fetchall
functions to
create a database table and populate it with data.
"""ETL Helper script to load records to a database table."""
import sqlite3
import etlhelper as etl
db_file = "igneous_rocks.db"
create_sql = """
CREATE TABLE IF NOT EXISTS igneous_rock (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
grain_size TEXT
)
"""
igneous_rocks = [
{"name": "basalt", "grain_size": "fine"},
{"name": "granite", "grain_size": "coarse"}
]
with sqlite3.connect(db_file) as conn:
# Create table
etl.execute(create_sql, conn)
# Insert rows
etl.load("igneous_rock", conn, igneous_rocks)
# Confirm selection
for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
print(row)
The output is:
{'id': 1, 'name': 'basalt', 'grain_size': 'fine'}
{'id': 2, 'name': 'granite', 'grain_size': 'coarse'}
Copying data¶
This script copies data to another database, with transformation and logging.
"""ETL Helper script to copy records between databases."""
import sqlite3
import datetime as dt
from typing import Generator
import etlhelper as etl
igneous_db_file = "igneous_rocks.db"
rock_db_file = "rocks.db"
create_sql = """
CREATE TABLE IF NOT EXISTS rock (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
category TEXT,
last_update DATETIME
)
"""
select_sql = "SELECT name FROM igneous_rock"
def transform(chunk: list[dict]) -> Generator[dict, None, None]:
for row in chunk:
row["category"] = "igneous"
row["last_update"] = dt.datetime.now()
yield row
etl.log_to_console()
with sqlite3.connect(igneous_db_file) as src:
with sqlite3.connect(rock_db_file) as dest:
# Create target table
etl.execute(create_sql, dest)
# Copy data
rows = etl.iter_rows(select_sql, src, transform=transform)
etl.load("rock", dest, rows)
# Confirm transfer
for row in etl.fetchall("SELECT * FROM rock", dest):
print(row)
The output is:
# 2024-05-08 14:57:42,046 execute: Executing query
# 2024-05-08 14:57:42,053 iter_chunks: Fetching rows (chunk_size=5000)
# 2024-05-08 14:57:42,054 executemany: Executing many (chunk_size=5000)
# 2024-05-08 14:57:42,054 iter_chunks: All rows returned
# 2024-05-08 14:57:42,055 executemany: 2 rows processed (0 failed)
# 2024-05-08 14:57:42,057 executemany: 2 rows processed in total
{'id': 1, 'name': 'basalt', 'category': 'igneous', 'last_update': '2024-05-08 14:59:54.878726'}
{'id': 2, 'name': 'granite', 'category': 'igneous', 'last_update': '2024-05-08 14:59:54.879034'}
The Recipes section has more example code.