Self-Hosted Airflow: A Revolution in Data Collection

Le brief IA que les pros lisent chaque soir
Les 7 actus IA du jour, décryptées en 5 min. Gratuit.
Inclus dès l'inscription : notre sélection des meilleurs guides & comparatifs IA.
Choisis ton rythme
Gratuit · Pas de spam · Désabonnement en 1 clic
Airflow and Self-Hosting: A New Era for Financial Data Collection
Self-hosting Airflow at home opens new perspectives for tech enthusiasts looking to master machine learning operations (MLOps). The primary goal is to understand how to deploy artificial intelligence (AI) workflows in a real-world environment. MLOps involves transferring AI models from the research phase to production while automating processes and effectively managing errors.
In my previous articles, I described the setup of a PostgreSQL server and an Airflow server. These servers act as essential databases for collecting the datasets needed to train AI models. The next step is to populate these PostgreSQL databases with relevant data. Airflow plays a crucial role here by orchestrating data pipelines, ensuring that the most up-to-date information is integrated into our database. This step is fundamental in the machine learning process, as it provides the essential data for training the models.
One of the motivations behind creating my home lab is to demonstrate that the entire machine learning process can be self-hosted using virtual machines and containers. Given my recent personal investments, I chose to work with financial data. This data is valuable for analyzing trends, correlating prices, and even attempting forecasts, making it extremely useful in various scenarios.
Setting Up Airflow
Running Your Airflow Server
To optimize the use of Airflow, I discovered and implemented several tips. During the initial setup of my Airflow server, I used the following commands to launch the various components:
nohup airflow scheduler > scheduler.log 2>&1 &nohup airflow dag-processor > dag-processor.log 2>&1 &nohup airflow triggerer > triggerer.log 2>&1 &nohup airflow [api](/glossaire/api)-server --port 8080 > api-server.log 2>&1 &
The nohup command allows a process to remain active even after the terminal is closed. However, I encountered issues with Airflow components crashing, requiring manual restarts. Additionally, starting each component required four separate commands, which was tedious.
To simplify this process, I created a script to start or restart Airflow more easily.
nano airflow_restart.sh
In the airflow_restart.sh file, I inserted the following code:
#!/bin/bash
pkill -f "airflow" --ignore-ancestors
sleep 2
echo "Starting the scheduler..."
nohup airflow scheduler > scheduler.log 2>&1 &
echo "Starting the DAG processor..."
nohup airflow dag-processor > dag-processor.log 2>&1 &
echo "Starting the triggerer..."
nohup airflow triggerer > triggerer.log 2>&1 &
echo "Starting the API server..."
nohup airflow api-server --port 8080 > api-server.log 2>&1 &
echo "Airflow restarted"
This script kills all processes related to "airflow" and restarts the four components. Thus, instead of executing four separate commands, it is now sufficient to run this script.
To address the frequent crashing issue, I configured Airflow as a system daemon process. A daemon is a background process that starts automatically at system boot, restarts on failure, and remains independent of the terminal.
To do this, I created a service file:
nano /etc/systemd/system/airflow-scheduler.service
Then, I inserted the following text:
[Unit]
Description=Airflow Scheduler
After=network.target postgresql.service
Wants=postgresql.service
[Service]
User=<USER>
Group=<GROUP>
Environment=PATH=<AIRFLOW_PATH>:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin
Environment=AIRFLOW_HOME=<AIRFLOW_FOLDER>
ExecStart=<AIRFLOW_PATH> scheduler
Restart=on-failure
RestartSec=5s
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
I replaced <USER>, <GROUP>, <AIRFLOW_PATH>, and <AIRFLOW_FOLDER> with the specific values for my configuration. I repeated this process for the other Airflow components: dag-processor, triggerer, api-server.
After saving the files, I executed the following command to reload the new daemon processes:
sudo systemctl daemon-reload
Then, I enabled and started all the services:
# Enable services at startup
sudo systemctl enable airflow-scheduler
sudo systemctl enable airflow-dag-processor
sudo systemctl enable airflow-triggerer
sudo systemctl enable airflow-api-server
# Start the services
sudo systemctl start airflow-scheduler
sudo systemctl start airflow-dag-processor
sudo systemctl start airflow-triggerer
sudo systemctl start airflow-api-server
Now, Airflow runs more stably on my server. You can use systemctl status to check if each component is running and journalctl to view logging information.
Adding Your PostgreSQL Connection
A crucial step is configuring the PostgreSQL connection on the Airflow server to allow smooth communication with the PostgreSQL database. Airflow provides a convenient PostgreSQL connection that allows you to define connection parameters once, making it easier to reuse them.
To configure this, I modified the airflow.cfg file:
nano airflow.cfg
I searched for the test_connection entry and set it to Enabled. Then, I restarted Airflow, either via the script or by restarting the daemon processes. This allows testing connections through the Airflow user interface.
In the Airflow user interface, I added the PostgreSQL connection by navigating to Admin > Connections. I selected Add Connection at the top right. Under Connection Type, I chose Postgres. If the Postgres option does not appear, it may be necessary to run pip install apache-airflow-providers-postgres on the Airflow server.
I then filled in the connection details: PostgreSQL Host (IP address), Login, Password, Port, and Database. Once the connection was added, I tested its validity by clicking on the graph icon, which turned into a wi-fi icon. A green message appeared, confirming the success of the connection test.
Creating the Airflow Pipeline Code
With Airflow properly configured, it is time to develop the code for the Airflow pipeline. Airflow operates through Directed Acyclic Graphs (DAGs), which structure workflows into multiple data processing tasks.
In a DAG, tasks can be executed in a specific order and with dependencies, allowing certain tasks to run after the completion of others or in parallel.
To obtain financial data, I used yfinance, a Python package that leverages Yahoo! Finance APIs to retrieve market data. This package is ideal for extracting and writing ticker price data into our PostgreSQL database.
Brief IA — L'actualité IA en français
L'essentiel de l'actualité de l'intelligence artificielle, décrypté et expliqué chaque jour.