Copy¶
ETL Helper provides three ways to copy data from one database to another. These are presented in order of increased control or customisability.
copy_table_rows¶
copy_table_rows()
provides a simple way
to copy all the data from one table to another.
It can take a transform
function in case some modification of the data,
e.g. change of case of column names, is required.
"""ETL Helper script to demonstrate copy_table_rows."""
import etlhelper as etl
from my_databases import POSTGRESDB, ORACLEDB
with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
etl.copy_table_rows("my_table", src_conn, dest_conn)
The chunk_size
, commit_chunks
and on_error
parameters can
all be set. A tuple with counts of rows processed and failed is
returned.
Note that the target table must already exist. If it doesn’t, you can
use execute
with a CREATE TABLE IF NOT EXISTS ...
statement to
create it first. See the recipes for examples.
Combining iter_rows
with load
¶
For extra control selecting the data to be transferred, iter_rows
can be combined with load
.
"""ETL Helper script to demonstrate copying records between databases with iter_rows and load."""
import etlhelper as etl
from my_databases import POSTGRESDB, ORACLEDB
select_sql = """
SELECT id, name, value FROM my_table
WHERE value > :min_value
"""
with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
rows = etl.iter_rows(select_sql, src_conn, parameters={"min_value": 99})
etl.load("my_table", dest_conn, rows)
copy_rows¶
Customising both queries gives the greatest control on data selection
and loading.
copy_rows
takes the results from a SELECT query and applies them as
parameters to an INSERT query.
The source and destination tables must already exist.
For example, here we use GROUP BY and WHERE in the SELECT query and insert extra
auto-generated values via the INSERT query.
"""ETL Helper script to demonstrate copy_rows."""
import etlhelper as etl
from etlhelper.row_factories import namedtuple_row_factory
from my_databases import POSTGRESDB, ORACLEDB
select_sql = """
SELECT
customer_id,
SUM (amount) AS total_amount
FROM payment
WHERE id > 1000
GROUP BY customer_id
"""
# This insert query uses positional parameters, so a namedtuple_row_factory
# is used.
insert_sql = """
INSERT INTO dest (
customer_id,
total_amount,
loaded_by,
load_time
)
VALUES (
%s,
%s,
current_user,
now()
)
"""
with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
etl.copy_rows(
select_sql,
src_conn,
insert_sql,
dest_conn,
row_factory=namedtuple_row_factory,
)
parameters
can be passed to the SELECT query as before and the
commit_chunks
, chunk_size
and on_error
options can be set.
A tuple of rows processed and failed is returned.