TransformΒΆ
ETL Helper functions accept a function as the transform
keyword argument,
which enables transformation of data in flight.
This is any Python callable (e.g. function or class) that takes an iterator
and returns another iterator (e.g. list or generator via the yield
statement).
Transform functions are applied to data as they are read
from the database (in the case of data fetching functions and
copy_rows
), or before they are passed as query parameters (to
executemany
or load
).
When used with copy_rows
or executemany
the INSERT query must contain
the correct parameter placeholders for the transformed result.
The simplest transform functions modify data returned mutable row
factories e.g., dict_row_factory
in-place. The yield
keyword
makes my_transform
a generator function that returns an Iterator
that can loop over the rows.
"""ETL Helper script to demonstrate using a transform function which yields individual rows."""
import sqlite3
from typing import Iterator
import etlhelper as etl
from etlhelper.row_factories import dict_row_factory
db_file = "igneous_rocks.db"
select_sql = "SELECT * FROM igneous_rock"
def my_transform(chunk: Iterator[dict]) -> Iterator[dict]:
# Add prefix to id, remove newlines, set lower case names
for row in chunk: # each row is a dictionary (mutable)
row["id"] += 1000
row["description"] = row["description"].replace("\n", " ")
row["name"] = row["name"].lower()
yield row
with sqlite3.connect(db_file) as conn:
rows = etl.fetchall(
select_sql,
conn,
row_factory=dict_row_factory,
transform=my_transform,
)
It is also possible to assemble the complete transformed chunk and
return it.
This code demonstrates that the returned chunk can have a
different number of rows, and be of different length, to the input.
Because namedtuple
s are immutable, we have to create a new_row
from each input row
.
"""ETL Helper script to demonstrate using a transform function which returns a list of rows."""
import random
import sqlite3
from typing import Iterator
import etlhelper as etl
from etlhelper.row_factories import namedtuple_row_factory
db_file = "igneous_rocks.db"
select_sql = "SELECT * FROM igneous_rock"
def my_transform(chunk: Iterator[tuple]) -> list[tuple]:
# Append random integer (1-10), filter if <5.
new_chunk = []
for row in chunk: # each row is a namedtuple (immutable)
extra_value = random.randrange(10)
if extra_value >= 5: # some rows are dropped
new_row = (*row, extra_value) # new rows have extra column
new_chunk.append(new_row)
return new_chunk
with sqlite3.connect(db_file) as conn:
rows = etl.fetchall(
select_sql,
conn,
row_factory=namedtuple_row_factory,
transform=my_transform,
)
Any Python code can be used within the function and extra data can result from a calculation, a call to a webservice or a query against another database. As a standalone function with known inputs and outputs, the transform functions are also easy to test.
The iter_chunks
and iter_rows
functions return generators.
Each chunk or row of data is only accessed when it is
required. This allows data transformation to be performed via
memory-efficient
iterator-chains.