Calling ETL Helper scripts from Apache AirflowΒΆ

The following is an Apache Airflow DAG that uses the copy_readings function defined in the Database to database script. The Airflow scheduler will create tasks for each day since 1 August 2019 and call copy_readings with the appropriate start and end times.

"""ETL Helper script to demonstrate using Apache Airflow to schedule tasks."""
import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from database_to_database import copy_readings


def copy_readings_with_args(**kwargs) -> None:
    # Set arguments for copy_readings from context
    start = kwargs.get("prev_execution_date")
    end = kwargs.get("execution_date")
    copy_readings(start, end)


dag = DAG(
    "readings",
    schedule_interval=dt.timedelta(days=1),
    start_date=dt.datetime(2019, 8, 1),
    catchup=True,
)

t1 = PythonOperator(
    task_id="copy_readings",
    python_callable=copy_readings_with_args,
    provide_context=True,
    dag=dag,
)