Airflow Reference

Reference Links

What is Airflow

A workflow manager. Other Workflow Managers are Oozie, Luigi, Azkaban, Taskflow etc.

Airflow Architecture

At its core, airflow is simply a queuing system built on top of a metadata database. The database stores the state of queued tasks and a scheduler uses these states to prioritize how other tasks are added to the queue. This functionality is ochestrated by four primary components.

  1. Metadata Database : stores state of tasks.
  2. Scheduler : Process that uses DAG definitions with state from the metadata database to decide which tasks to execute, along with their priority. Generally run as a service.
  3. Executor : This is a message queuing service tightly bound to the Scheduler and determines the worker processes that actually execute each scheduled task. For example, the LocalExecutor executes tasks with parallel processes that run on the same machine as the Scheduler process.
  4. Workers : These are the processes that actually execute the logic of tasks, and are determined by the Executor being used.

Basic Airflow Concepts

DAGs
Directed Acyclic Graphs written in Python
Operators
Operators are objects that /can be/ actual nodes in the DAG that determine what gets done. Various types include action operators, sensors and transfer operators. action and transfer(?) operators are simply python classes with an execute() method. sensor operators are classes with a poke() method, which gets called repeatedly until True is returned. Other types of operators are branch operator
XCom
Cross-Communication used to send metadata between operators in a DAG. This is just a record in a central database (saved as a pickled object) that operators can write to and read from.
Task
A task is a representation of an operator with a particular set of input arguments. For example, we have one BashOperator but can have three different “bash tasks” in a DAG.
TaskInstance
When a DAG is run, each Task spawns a TaskInstance, an instance of a task tied to a particular time of execution.
DagRun
All task instances in a DAG are grouped into a DagRun.

Summary: A DAG consists of tasks, which are parameterized representations of operators. Each time a DAG is executed, a DagRun is created, which holds all TaskInstances made from tasks for this run.

General Operator Guidelines

  • Idempotence
  • Atomicity
  • Metadata Exchange

Example DAG

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
    schedule_interval='0 12 * * *',
    start_date=datetime.datetime(2017,7,13), catchup=False)


with dag:
    dummy_task = DummyOperator(task_id='dummy', retries=3)
    hello_task = PythonOperator(task_id='snake', python_callable=print_hello)

    dummy_task >> hello_task

Example Operator

An example action operator is as follows -

class MyFirstOperator(BaseOperator):

    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        self.task_param=my_param
        super(MyFirstOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        log.info('Hello World')
        log.info('my_param: %s', self.task_param)

Using this operator is as follows -

with dag:
    my_first_task = MyFirstOperator(my_param='This is a test.', task_id='my_task')

An example Sensor is as follows

class MyFirstSensor(BaseSensorOperator):

    def poke(self, context):
        current_minute = datetime.now().minute
        if current_minute % 3 != 0:
            log.info('Current minute (%s) not divisible by 3, sensor will retry.', current_minute)
            return False
        log.info('Current minute (%s) is divisible by 3, sensor finishing.', current_minute)
         task_instance = context['task_instance']
         task_instance.xcom_push('sensors_minute', current_minute)
         return True

An example branch operator is as follows -

def choose():
    return 'first'

with dag:
    branching = BranchPythonOperator(task_id = 'branching', python_callable=choose)
    branching >> DummyOperator(task_id='first')
    branching >> DummyOperator(task_id='second')

Dealing with XCom

An XCom Push looks like this -

task_instance = context['task_instance']
task_instance.xcom_push('sensors_minute', current_minute)

An XCom Pull looks like this -

task_instance = context['task_instance']
sensors_minute = task_instance.xcom_pull('sensor_task_id', key='sensors_minute')

You can scan upstream tasks for information

def execute(self, context):
    log.info('XCom: Scanning upstream tasks for Database IDs')
    task_instance = context['task_instance']

    upstream_tasks = self.get_flat_relatives(upstream=True)
    upstream_task_ids = [task.task_id for task in upstream_tasks]
    upstream_database_ids = task_instance.xcom_pull(task_ids=upstream_task_ids, key='db_id')

    log.info('XCom: Found the following task IDs: %s', upstream_database_ids)

Skipping Task Execution

Raising an AirflowSkipException allows skipping execution of the current task. All other exceptions cause retries and ultimately the task to fail.

def execute(self, context):
    ...
    if not conditions_met:
        log.info('Conditions not met, skipping.')
        raise AirflowSkipException()

Trigger Rules

Normal workflow behavior is to trigger a task when all their directly upstream tasks have succeeded. But more complex dependency settings are possible. All operators have a trigger_rule argument which defines the rule by which the generated task gets triggered. The default value is all_success. Other rules that are possible are -

  • all_success (default): All parents have succeeded.
  • all_failed : All parents are in failed or upstream_failed state.
  • all_done : All parents are done with their execution.
  • one_failed : Fires as soon as at least one parent has failed, it does not wait for all parents to be done.
  • one_success : Fires as soon as at least one parent has succeeded, it does not wait for all parents to be done.
  • none_failed: All parents have not failed (failed or upstream_failed) i.e, all parents have succeeded or been skipped.
  • none_skipped : No parent is in skipped state, i.e all parents are in a success, failed, or upstream_failed state.
  • dummy : Dependencies are just for show, trigger at will.

These trigger rules can be used with depends_on_past (boolean) that, when set to True keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded.

Jinja templates and Macros

Look at the documentation to understand which parameters are templated. Templated parameters are processed using a Jinja preprocessor. For example, the env template for the BashOperator is templated allowing you to do something like this -

# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id='test_env'
    bash_command='/tmp/test.sh'
    dag=dag
    env={'EXECUTION_DATE': date})

Airflow Plugins

These are typically subclasses of AirflowPlugin. The file is usually placed in AIRFLOW_HOME/plugins.

Here’s the skeleton of an Airflow Plugin

class MyPlugin(AirflowPlugin):
    name = "my_plugin"

    # A list of classes derived from BaseOperator
    operators = []

    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

    # A list of objects created from a class derived from flask_admin.BaseView
    admin_views = []

    # A list of Blueprint objects created from flask.Blueprint
    flask_blueprints = []

    # A list of classes derived from BaseHook (connection clients)
    hooks = []

    # A list of classes derived from BaseExecutor (e.g MesosExecutor)
    executors = []

Airflow CLI

Full featured CLI. Example commands include -

  • airflow test DAG_ID TASK_ID EXECUTION_DATE : allows a user to run a task in isolation without affecting the metadata database.

  • airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE : Performs a backfill of historical data between START_DATE and END_DATE without the need to run the scheduler.

  • airflow clear DAG_ID : Removes TaskInstance records in the metadata database for the DAG_ID.

  • airflow resetdb: Clean slate.

comments powered by Disqus