Handling cyclical features, such as hours in a day, for machine learning pipelines with Python example
2022-09-26
How to return pandas dataframes from Scikit-Learn transformations: New API simplifies data preprocessing
2022-10-24
Show all

Setting up Apache Airflow using Docker-Compose

11 mins read

Although being pretty late to the party (Airflow became an Apache Top-Level Project in 2019), I still had trouble finding an easy-to-understand, up-to-date, and lightweight solution to installing Airflow. In this post, we will create a lightweight, standalone, and easily deployed Apache Airflow development environment in just a few minutes.

Docker-Compose will be our close companion, allowing us to create a smooth development workflow with quick iteration cycles. Simply spin up a few docker containers and we can start to create our own workflows.

Note: The following setup will not be suitable for any production purposes and is intended to be used in a development environment only.

You can find the full codes for this post on my GitHub.

Why Airflow?

Apache Airflow is a batch-oriented framework that allows us to easily build scheduled data pipelines in Python. Think of “workflow as code” capable of executing any operation we can implement in Python.

Airflow is not a data processing tool itself. It’s an orchestration software. We can imagine Airflow as some kind of spider in a web. Sitting in the middle, pulling all the strings, and coordinating the workload of our data pipelines.

A data pipeline typically consists of several tasks or actions that need to be executed in a specific order. Apache Airflow models such a pipeline as a DAG (directed acyclic graph). A graph with directed edges or tasks without any loops or cycles.

A simple example DAG

This approach allows us to run independent tasks in parallel, saving time and money. Moreover, we can split a data pipeline into several smaller tasks. If a job fails, we can only rerun the failed and the downstream tasks, instead of executing the complete workflow all over again.

Airflow is composed of three main components:

  1. Airflow Scheduler — the “heart” of Airflow, that parses the DAGs, checks the scheduled intervals, and passes the tasks over to the workers.
  2. Airflow Worker — picks up the tasks and actually performs the work.
  3. Airflow Webserver — provides the main user interface to visualize and monitor the DAGs and their results.
A high-level overview of Airflow components

Step-By-Step Installation

Now that we shortly introduced Apache Airflow, it’s time to get started.

Step 0: Prerequisites

Since we will use docker-compose to get Airflow up and running, we have to install Docker first. Simply head over to the official Docker site and download the appropriate installation file for your OS.

Step 1: Create a new folder

We start nice and slow by simply creating a new folder for Airflow.

Just navigate via your preferred terminal to a directory, create a new folder, and change into it by running:

mkdir airflow
cd airflow

Step 2: Create a docker-compose file

Next, we need to get our hands on a docker-compose file that specifies the required services or docker containers.

You can download docker-compose.yml file from this repo or simply create a new file named docker-compose.yml and copy the below content.

---
version: '3.4'

x-common:
  &common
  image: apache/airflow:2.3.0
  user: "${AIRFLOW_UID}:0"
  env_file: 
    - .env
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - /var/run/docker.sock:/var/run/docker.sock

x-depends-on:
  &depends-on
  depends_on:
    postgres:
      condition: service_healthy
    airflow-init:
      condition: service_completed_successfully

services:
  postgres:
    image: postgres:13
    container_name: postgres
    ports:
      - "5434:5432"
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    env_file:
      - .env

  scheduler:
    <<: *common
    <<: *depends-on
    container_name: airflow-scheduler
    command: scheduler
    restart: on-failure
    ports:
      - "8793:8793"

  webserver:
    <<: *common
    <<: *depends-on
    container_name: airflow-webserver
    restart: always
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 30s
      retries: 5
  
  airflow-init:
    <<: *common
    container_name: airflow-init
    entrypoint: /bin/bash
    command:
      - -c
      - |
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version

The above docker-compose file simply specifies the required services we need to get Airflow up and running. Most importantly the scheduler, the webserver, the metadatabase (PostgreSQL), and the airflow-init job initializing the database.

At the top of the file, we make use of some local variables that are commonly used in every docker container or service.

Step 3: Environment variables

We successfully created a docker-compose file with the mandatory services inside. However, to complete the installation process and configure Airflow properly, we need to provide some environment variables.

Still, inside your Airflow folder create a .env file with the following content:

# Meta-Database
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow

# Airflow Core
AIRFLOW__CORE__FERNET_KEY=UKMzEm3yIuFYEq1y3-2FxPNWSVwRASpahmQ9kQfEr8E=
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW_UID=0

# Backend DB
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False

# Airflow Init
_AIRFLOW_DB_UPGRADE=True
_AIRFLOW_WWW_USER_CREATE=True
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow

The above variables set the database credentials, the airflow user, and some further configurations. Most importantly, the kind of executor Airflow we will utilize. In our case, we make use of the LocalExecutor.

Note: More information on the different kinds of executors can be found here.

Step 4: Run docker-compose

And this is already it!

Just head over to the terminal and spin up all the necessary containers by running

docker compose up -d

After a short period of time, we can check the results and the Airflow Web UI by visiting http://localhost:8080. Once we sign in with our credentials (airflow: airflow) we gain access to the user interface.

Airflow 2 Web UI

A Quick Test

With a working Airflow environment, we can now create a simple DAG for testing purposes. First of all, make sure to run pip install apache-airflow to install the required Python modules.

Now, inside your Airflow folder, navigate to dags folder and create a new file called sample_dag.py.

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator

from datetime import datetime

with DAG(
    dag_id='first_sample_dag',
    start_date=datetime(2022, 5, 28),
    schedule_interval=None
) as dag:

    start_task = EmptyOperator(
        task_id='start'
    )

    print_hello_world = BashOperator(
        task_id='print_hello_world',
        bash_command='echo "HelloWorld!"'
    )

    end_task = EmptyOperator(
        task_id='end'
    )

start_task >> print_hello_world
print_hello_world >> end_task

We define a new DAG and some pretty simple tasks.

The EmptyOperator serves no real purpose other than to create a mockup task inside the Web UI. By utilizing BashOperator, we create a somewhat creative output of “HelloWorld!”. This allows us to visually confirm a proper running Airflow setup.

Save the file and head over to the Web UI. We can now start the DAG by manually triggering it.

Manually triggering a DAG

Note: It may take a while before your DAG appears in the UI. We can speed things up by running the following command in our terminal:

docker exec -it --user airflow airflow-scheduler bash -c "airflow dags list"

Running the DAG shouldn’t take any longer than a couple of seconds.

Once finished, we can navigate to XComs and inspect the output.

Navigating to Airflow XComs
Inspecting the output

And this is it!

We successfully installed Airflow with docker-compose and gave it a quick test ride.

Note: We can stop the running containers by simply executing docker compose down.

How to Design Better DAGs in Apache Airflow

In the last section, we learned how to quickly spin up a development environment for Apache Airflow. However, we have yet to learn how to design an efficient workflow. Simply having a great tool at our fingertips won’t cut the deal alone — unfortunately.

Although Apache Airflow does a pretty good job at doing most of the heavy lifting for us, we still need to ensure certain key properties for each Airflow task, in order to obtain proper and consistent results. Luckily, a lot of best practices exist. Today, we begin with two of the most important concepts that apply universally to all workflows. Today, we learn about atomicity and idempotency.

All or nothing: Atomicity

Often used in the context of database systems, atomicity is one of the ACID properties and is considered an indivisible, irreducible series of operations such that either all occur or nothing at all. It is either performed entirely or not performed at all. In terms of Apache Airflow, that means a task should be defined in a way that allows for success with a proper result or complete failure without affecting the state of the system.

Let’s imagine, that we have to extract data from a CSV file, apply some transformations to it, and write the result to a database. Simple enough, right?

A bad, non-atomic approach would be the following. We extract the data line-by-line, apply the transformation right away, and upload the result immediately to the database. All within the same task.

A non-atomic approach

Now, if some lines are corrupt and the task fails halfway through, we’re left with only a fragment of the desired results. Some lines are processed and already inserted — some are simply nonexistent. Debugging and rerunning this task while avoiding duplication would be a nightmare. An improved, atomic workflow might be defined like this.

A better approach with atomic tasks

So a general rule of thumb to keep in mind is to split up the operations into different tasks. One operation equals a single task — think Single-responsibility principle.

Unfortunately, this simple rule cannot be applied every time. Some operations are so tightly coupled, that it’s best to keep them in a single coherent unit of work. For example, authenticating to an API before executing the request.

Luckily for us, most Airflow operators are designed in an atomic fashion and can be used straight off the shelf. With the more flexible types of operators like the Bash or Python operator, however, we have to be more cautious and mindful when designing our workflow. Creating atomic Airflow tasks allows for the ability to recover from failure and rerun only the failed and downstream tasks. Atomicity provides easier maintainable and transparent workflows without hidden dependencies and side effects.

Start, Stop, Rewind: Idempotency

The concept of idempotency goes hand-in-hand with the idea of atomicity and describes a property of certain operations in mathematics and computer science. So the operations can be applied multiple times without changing the result beyond the initial application. Think of pressing the “on-button” on a control panel as an operation. Pressing this button multiple times has the same effect as just pressing it once. So what does this mean in the context of Apache Airflow?

Calling the same task multiple times with the same input has no additional effect. In other words, if rerunning a task without changing the input yields the same output it can be considered idempotent. Idempotency allows for decreased recovery time from failure and reduces data loss.

Now, let’s imagine our job is to fetch data from a database for a specific day and write the results to a CSV file. Rerunning this task for the same day should overwrite the existing file and produce the same output every time it is executed.

An idempotent task producing the same output every time

Suppose, we design our task in a different way that with each rerun we simply append the records to an existing file.

Now, we violate the concept of idempotency. Every rerun of the task produces a different result with duplicate records.

Non-idempotent task producing duplicate results

In general, tasks that write should check for existing recordsoverwrite or use UPSERT operations to conform to the rules of idempotency. For more general applications we have to, however, think carefully about all possible side effects.

Conclusion

Airflow is a batch-oriented framework that allows us to create complex data pipelines in Python. In this article, we created a simple and easy-to-use environment to quickly iterate and develop new workflows in Apache Airflow. By leveraging docker-compose we can get straight to work and code new workflows. However, such an environment should only be used for development purposes and is not suitable for any production environment that requires a more sophisticated and distributed setup of Apache Airflow.

Moreover, we covered two of the most important principles when designing DAGs in Apache Airflow: atomicity and idempotency. Committing those concepts to memory enables us to create better workflows that are recoverable, rerunnable, fault-tolerant, consistent, maintainable, transparent, and easier to understand. However, there are a lot more best practices to adhere to and consider when coding and creating the next workflow.

Source:

https://towardsdatascience.com/setting-up-apache-airflow-with-docker-compose-in-5-minutes-56a1110f4122

Amir Masoud Sefidian
Amir Masoud Sefidian
Machine Learning Engineer

Comments are closed.