Load

Functions

ETL Helper provides two functions for loading of data.

  • load(): Inserts data from an iterable of dictionaries or namedtuples into an existing target table.

"""ETL Helper script to demonstrate load."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

rows = [
    {"name": "basalt", "grain_size": "fine"},
    {"name": "granite", "grain_size": "coarse"}
]

with sqlite3.connect(db_file) as conn:
    # Note that table must already exist
    processed, failed = etl.load("igneous_rock", conn, rows)

NOTE: the load function uses the first row of data to generate the list of column for the insert query. If later items in the data contain extra columns, those columns will not be inserted and no error will be raised.

  • executemany(): Applies SQL query with parameters supplied by iterable of data. Customising the SQL query allows fine control.

"""ETL Helper script to demonstrate using executemany with a named placeholder query."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

# Insert query changes case and adds update_at column
insert_sql = """
    INSERT INTO igneous_rocks (name, grain_size, updated_at)
    VALUES (:name, UPPER(:grainsize), datetime('now'))
"""

rows = [
    {"name": "basalt", "grain_size": "fine"},
    {"name": "granite", "grain_size": "coarse"}
]

with sqlite3.connect(db_file) as conn:
    # Note that table must already exist
    processed, failed = etl.executemany(insert_sql, conn, rows)

The INSERT query must container placeholders with an appropriate format for the input data e.g. positional for tuples, named for dictionaries.

"""ETL Helper script to demonstrate using executemany with a positional placeholder query."""
import sqlite3
import etlhelper as etl

db_file = "igneous_rocks.db"

# Positional placeholders for data in tuple format
insert_sql = """
    INSERT INTO igneous_rocks (name, grain_size, updated_at)
    VALUES (?, UPPER(?), datetime('now'))
"""

rows = [("basalt", "fine"), ("granite", "coarse")]

with sqlite3.connect(db_file) as conn:
    # Note that table must already exist
    processed, failed = etl.executemany(insert_sql, conn, rows)

executemany() can also be used with UPDATE commands.

Keyword arguments

chunk_size

load() uses executemany() behind the scenes. Large datasets are broken into chunks and inserted in batches to reduce the number of queries, and only one chunk of data is held in memory at any time. Within a data processing pipeline, the next step can begin as soon as the first chunk has been processed. The database connection must remain open until all data have been processed.

The chunk_size default is 5,000 and it can be set with a keyword argument.

commit_chunks

The commit_chunks flag defaults to True. This ensures that an error during a large data transfer doesn’t require all the records to be sent again. Some work may be required to determine which records remain to be sent. Setting commit_chunks to False will roll back the entire transfer in case of an error.

transform

The transform parameter takes a callable (e.g. function) that transforms the data before returning it. See the Transform section for details.

on_error

Accepts a Python function that will be applied to failed rows. See on_error section for details.

Return values

The number of rows that were processed and the number that failed is returned as a tuple.