Error Handling

This section describes Exception classes, on_error functions and error handling via SQL.

ETLHelperError

ETL Helper has a variety of Exception classes, all of which are subclasses of the ETLHelperError base class.

To aid debugging, the ETLHelperQueryError, ETLHelperExtractError and ETLHelperInsertError classes print the SQL query and the required paramstyle as well as the error message returned by the database.

"""ETL Helper script to demonstrate an extract error."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"
select_sql = "SELECT * FROM bad_table"

with sqlite3.connect(db_file) as conn:
    rows = etl.fetchall(select_sql, conn)

The output is:

etlhelper.exceptions.ETLHelperExtractError: SQL query raised an error.

SELECT * FROM bad_table

Required paramstyle: qmark

no such table: bad_table

also handling errors in SQL e.g. ON CONFLICT

on_error

The default behaviour of etlhelper is to raise an exception on the first error and abort the transfer. Sometimes it is desirable to ignore the errors and to do something else with the failed rows. The on_error parameter allows a function to be passed that is applied to the failed rows of each chunk. The input is a list of (row, exception) tuples.

Different examples are given here. The simplest approach is to collect all the errors into a list to process at the end.

errors = []
etl.executemany(sql, conn, rows, on_error=errors.extend)

if errors:
    do_something()

Errors can be logged to the etlhelper logger.

"""ETL Helper script to demonstrate logging errors."""
import logging
import sqlite3
import etlhelper as etl

etl.log_to_console()
logger = logging.getLogger("etlhelper")

db_file = "igneous_rocks.db"

rows = [
    {"name": "basalt", "grain_size": "fine"},
    {"name": "granite", "grain_size": "coarse"}
]


def log_errors(failed_rows: list[tuple[dict, Exception]]) -> None:
    for row, exception in failed_rows:
        logger.error(exception)


with sqlite3.connect(db_file) as conn:
    etl.load("igneous_rock", conn, rows, on_error=log_errors)

The IDs of failed rows can be written to a file.

def write_bad_ids(failed_rows):
    with open('bad_ids.txt', 'at') as out_file:
        for row, exception in failed_rows:
            out_file.write(f"{row.id}\n")

etl.executemany(sql, conn, rows, on_error=write_bad_ids)

executemany, load, copy_rows and copy_table_rows can all take an on_error parameter. They each return a tuple containing the number of rows processed and the number of rows that failed.

Error handling via SQL

The on_error functions allow individual failed rows to be processed, however this flexibility can come at the expense of speed. Each chunk of data that contains a bad row will be retried on a row-by-row basis.

Databases also have methods for handling errors e.g. duplicate primary keys using SQL. By customising an INSERT query (which can be programmatically generated with generate_insert_query()) the database can be instructed how to process such rows.

The following example for SQLite will ignore duplicate rows. Different databases have different syntax and capabilities, including upsert and merge.

"""ETL Helper script to load records to a database table."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

create_sql = """
    CREATE TABLE IF NOT EXISTS igneous_rock (
        id INTEGER PRIMARY KEY,
        name TEXT UNIQUE,
        grain_size TEXT
    )"""
insert_sql = """
    INSERT INTO igneous_rock (id, name, grain_size)
    VALUES (:id, :name, :grain_size)
    ON CONFLICT DO NOTHING
"""

igneous_rocks = [
    {"id": 1, "name": "basalt", "grain_size": "fine"},
    {"id": 1, "name": "basalt", "grain_size": "fine"}  # duplicate row
]


with sqlite3.connect(db_file) as conn:
    # Create table
    etl.execute(create_sql, conn)

    # Insert rows
    etl.executemany(insert_sql, conn, rows=igneous_rocks)

    # Confirm selection
    for row in etl.fetchall("SELECT * FROM igneous_rock", conn):
        print(row)

The output is:

{'id': 1, 'name': 'basalt', 'grain_size': 'fine'}