pip install 'apache-airflow[celery]'
Some caveats:
Make sure to use a database backed result backend
Make sure to set a visibility timeout in [celery_broker_transport_options]
that exceeds the ETA of your longest running task
Make sure to set umask in [worker_umask]
to set permissions for newly created files by workers.
Tasks can consume resources. Make sure your worker has enough resources to run worker_concurrency
tasks
Queue names are limited to 256 characters, but each broker backend might have its own restrictions
See Modules Management for details on how Python and Airflow manage modules.
Architecture
digraph A{
rankdir="TB"
node[shape="rectangle", style="rounded"]
subgraph cluster {
label="Cluster";
{rank = same; dag; database}
{rank = same; workers; scheduler; web}
workers[label="Workers"]
scheduler[label="Scheduler"]
web[label="Web server"]
database[label="Database"]
dag[label="DAG files"]
subgraph cluster_queue {
label="Celery";
{rank = same; queue_broker; queue_result_backend}
queue_broker[label="Queue broker"]
queue_result_backend[label="Result backend"]
web->workers[label="1"]
web->dag[label="2"]
web->database[label="3"]
workers->dag[label="4"]
workers->database[label="5"]
workers->queue_result_backend[label="6"]
workers->queue_broker[label="7"]
scheduler->dag[label="8"]
scheduler->database[label="9"]
scheduler->queue_result_backend[label="10"]
scheduler->queue_broker[label="11"]
Airflow consist of several components:
Workers - Execute the assigned tasks
Scheduler - Responsible for adding the necessary tasks to the queue
Web server - HTTP Server provides access to DAG/task status information
Database - Contains information about the status of tasks, DAGs, Variables, connections, etc.
Celery - Queue mechanism
Please note that the queue at Celery consists of two components:
Broker - Stores commands for execution
Result backend - Stores status of completed commands
The components communicate with each other in many places
[1] Web server –> Workers - Fetches task execution logs
[2] Web server –> DAG files - Reveal the DAG structure
[3] Web server –> Database - Fetch the status of the tasks
[4] Workers –> DAG files - Reveal the DAG structure and execute the tasks
[5] Workers –> Database - Gets and stores information about connection configuration, variables and XCOM.
[6] Workers –> Celery’s result backend - Saves the status of tasks
[7] Workers –> Celery’s broker - Stores commands for execution
[8] Scheduler –> DAG files - Reveal the DAG structure and execute the tasks
[9] Scheduler –> Database - Store a DAG run and related tasks
[10] Scheduler –> Celery’s result backend - Gets information about the status of completed tasks
[11] Scheduler –> Celery’s broker - Put the commands to be executed
Task execution process
Sequence diagram - task execution process
Initially, two processes are running:
SchedulerProcess - process the tasks and run using CeleryExecutor
WorkerProcess - observes the queue waiting for new tasks to appear
WorkerChildProcess - waits for new tasks
Two databases are also available:
QueueBroker
ResultBackend
During this process, two 2 process are created:
LocalTaskJobProcess - It logic is described by LocalTaskJob. It is monitoring RawTaskProcess. New processes are started using TaskRunner.
RawTaskProcess - It is process with the user code e.g. execute()
.
[1] SchedulerProcess processes the tasks and when it finds a task that needs to be done, sends it to the QueueBroker.
[2] SchedulerProcess also begins to periodically query ResultBackend for the status of the task.
[3] QueueBroker, when it becomes aware of the task, sends information about it to one WorkerProcess.
[4] WorkerProcess assigns a single task to a one WorkerChildProcess.
[5] WorkerChildProcess performs the proper task handling functions - execute_command()
. It creates a new process - LocalTaskJobProcess.
[6] LocalTaskJobProcess logic is described by LocalTaskJob
class. It starts new process using TaskRunner.
[7][8] Process RawTaskProcess and LocalTaskJobProcess is stopped when they have finished their work.
[10][12] WorkerChildProcess notifies the main process - WorkerProcess about the end of the task and the availability of subsequent tasks.
[11] WorkerProcess saves status information in ResultBackend.
[13] When SchedulerProcess asks ResultBackend again about the status, it will get information about the status of the task.
Queues
When using the CeleryExecutor, the Celery queues that tasks are sent to
can be specified. queue
is an attribute of BaseOperator, so any
task can be assigned to any queue. The default queue for the environment
is defined in the airflow.cfg
’s operators -> default_queue
. This defines
the queue that tasks get assigned to when not specified, as well as which
queue Airflow workers listen to when started.
Workers can listen to one or multiple queues of tasks. When a worker is
started (using command airflow celery worker
), a set of comma-delimited queue
names (with no whitespace) can be given (e.g. airflow celery worker -q spark,quark
).
This worker will then only pick up tasks wired to the specified queue(s).
This can be useful if you need specialized workers, either from a
resource perspective (for say very lightweight tasks where one worker
could take thousands of tasks without a problem), or from an environment
perspective (you want a worker running from within the Spark cluster
itself because it needs a very specific environment and security rights).
Apache Airflow, Apache, Airflow, the Airflow logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.
All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.