[AAF] Hướng dẫn tạo DAG cơ bản

Tác giả: 30/05/2025

Ở bài viết này chúng ta sẽ đi sâu về DAG, hướng dẫn tạo cũng như chạy một DAGs cơ bản. Ở bài viết trước, mình đã giới thiệu và hướng dẫn bạn cài đặt AAF vậy nên với bài viết này, chúng ta cùng đi sâu để tìm hiểu nhé!

Airflow hoạt động như thế nào?

  • Không giống như các công cụ Dữ liệu lớn như Apache Kafka, Apache Storm, Apache Spark,hoặc Flink, Apache Airflow không phải là giải pháp truyền dữ liệu. Nó chủ yếu là một trình quản lý quy trình làm việc

Tổng quan về các thành phần cơ bản của Apache Airflow :

  • Scheduler: giám sát tất cả các DAG và các tác vụ được liên kết của chúng. Đối với 1 tác vụ, khi các phụ thuộc được đáp ứng, Scheduler sẽ khởi tạo tác vụ đó. Nó kiểm tra các tác vụ đang hoạt động để bắt đầu theo định kỳ
  • Executor: xử lý việc chạy các tác vụ này bằng cách đưa chúng cho worker để chạy
  • Web server: giao diện người dùng của Airflow, hiện thị trạng thái của nhiệm vụ và cho phép người dùng tương tác với cơ sở dữ liệu cũng như đọc tệp nhật kỹ từ kho lưu trữ từ xa như Google Cloud Storage, S3, …
  • DAG Directory: một thư mục chứa các file DAG của các quy trình xử lý dữ liệu (data pipelines) trong Airflow.
  • Metabase Database: được sử dụng bởi Scheduler, Executor và Web Server để lưu trữ thông tin quan trọng của từng DAG, ví dụ như các phiên bản, số liệu thống kê mỗi lần chạy, khoảng thời gian lên lịch, …

DAGs

1 DAG (Directed Acyclic Graph) sẽ kết nối các tasks, xây dựng liên kết, tự động chạy task, … Dưới đây là 1 ví dụ về DAG:

Hình bên trên bao gồm 4 task: A, B, C, D có mối liên kết với nhau và chạy theo thứ tự mà người dùng quy định: a -> b, c -> d. Nếu task a oẳng thì sẽ dẫn đến task b, c, d oẳng theo. Bạn cũng có thể tùy chỉnh thời gian để các tasks này chạy vào thời điểm mong muốn. Bạn có thể yên tâm một điều là DAG sẽ không quan tâm đến code của bạn, mục tiêu của nó chỉ là workflow mà thôi.

Task

  • Task là một đơn vị cơ bản để thực hiện một công việc nhỏ trong quy trình xử lý dữ liệu. Mỗi Task là một bước trong quy trình và có thể được lập lịch thực hiện tùy theo các điều kiện cụ thể.
  • Một Task trong Airflow có các thuộc tính và phương thức sau:
    • task_id: định danh duy nhất của task trong DAG.
    • owner: người sở hữu task.
    • depends_on_past: xác định liệu task hiện tại có phụ thuộc vào kết quả của task trước đó hay không.
    • retries: số lần thử lại nếu task thất bại.
    • retry_delay: khoảng thời gian giữa các lần thử lại.
    • start_date: thời điểm bắt đầu thực hiện task.
    • end_date: thời điểm kết thúc thực hiện task.
    • execution_timeout: thời gian tối đa cho phép để thực hiện task.
    • on_failure_callback: hàm được gọi khi task thất bại.
    • on_success_callback: hàm được gọi khi task thành công.

Operator

  • Mỗi operator đại diện cho một công việc cụ thể trong quy trình, ví dụ như đọc dữ liệu từ một nguồn dữ liệu, xử lý dữ liệu, hoặc ghi dữ liệu vào một nguồn dữ liệu khác.
  • Các operator trong Airflow được phân loại thành các loại chính sau
    • BashOperator: Chạy các lệnh Bash hoặc script Shell.
    • PythonOperator: Thực thi các hàm Python.
    • EmailOperator: Gửi email thông qua SMTP.
    • DummyOperator: Được sử dụng để tạo các kết nối giữa các task.
    • PythonVirtualenvOperator: Thực thi các hàm Python trong một môi trường ảo.
    • MySqlOperator: Thực hiện các lệnh SQL trên cơ sở dữ liệu MySQL.
    • PostgresOperator: Thực hiện các lệnh SQL trên cơ sở dữ liệu PostgreSQL.
    • S3FileTransformOperator: Thực hiện các chức năng xử lý file trên Amazon S3.
    • SparkSqlOperator: Thực hiện các truy vấn Spark SQL.
    • HdfsSensor: Kiểm tra sự tồn tại của một tệp trên Hadoop Distributed File System (HDFS).

Ví dụ, giả sử bạn có một công việc hàng ngày cần đọc dữ liệu từ một tệp CSV, xử lý dữ liệu và lưu kết quả vào một cơ sở dữ liệu PostgreSQL. Trong Airflow, bạn có thể sử dụng các operator như sau:

  • FileSensor: Kiểm tra sự tồn tại của tệp CSV trên hệ thống tệp.
  • BashOperator: Sử dụng lệnh Bash để di chuyển tệp CSV đến thư mục xử lý.
  • PythonOperator: Thực hiện các xử lý dữ liệu, ví dụ như đọc tệp CSV và chuyển đổi dữ liệu thành định dạng phù hợp để lưu vào cơ sở dữ liệu.
  • PostgresOperator: Thực hiện các lệnh SQL để lưu kết quả xử lý vào PostgreSQL.
  • EmailOperator: Gửi email thông báo cho người dùng khi quy trình xử lý dữ liệu hoàn thành.

Với các operator này, bạn có thể tạo một DAG trong Airflow để tự động hóa quy trình xử lý dữ liệu hàng ngày. DAG sẽ kiểm tra sự tồn tại của tệp CSV, di chuyển nó đến thư mục xử lý, thực hiện các xử lý dữ liệu và lưu kết quả vào cơ sở dữ liệu, sau đó gửi email thông báo cho người dùng khi quy trình hoàn thành.

Sensor

  • Sensor là một loại Operator được sử dụng để giám sát các sự kiện và điều kiện, và thực hiện các hành động tương ứng.
  • Sensor thường được sử dụng để đợi cho đến khi một điều kiện nào đó xảy ra trước khi tiếp tục thực hiện quy trình.
  • Các loại Sensor trong Airflow bao gồm:
    • FileSensor: Kiểm tra sự tồn tại của một tệp trên hệ thống tệp.
    • TimeSensor: Đợi cho đến khi một khoảng thời gian cụ thể đã trôi qua.
    • HttpSensor: Kiểm tra sự phản hồi của một URL cụ thể.
    • HdfsSensor: Kiểm tra sự tồn tại của một tệp trên Hadoop Distributed File System (HDFS).
    • SqlSensor: Kiểm tra sự tồn tại của một bảng hoặc một số dòng dữ liệu trong cơ sở dữ liệu.
    • S3KeySensor: Kiểm tra sự tồn tại của một đối tượng trên Amazon S3.
    • ExternalTaskSensor: Kiểm tra trạng thái của một task khác trong DAG.

Ví dụ, nếu bạn muốn tải dữ liệu từ một API bên ngoài vào cơ sở dữ liệu của mình hàng giờ, bạn có thể sử dụng HttpSensor để kiểm tra sự phản hồi của API trước khi tiếp tục thực hiện quy trình. Nếu API không phản hồi, Sensor sẽ giữ cho task đang chạy và thử lại sau một khoảng thời gian cụ thể, giúp đảm bảo rằng không có dữ liệu bị mất hoặc xử lý sai.

Tạo một DAG cơ bản

Chúng ta sẽ cùng tạo 1 file my_dag.py trên VPS có nội dung như sau :

<span class="token keyword">from</span> datetime <span class="token keyword">import</span> datetime<span class="token punctuation">,</span> timedelta
<span class="token keyword">from</span> airflow <span class="token keyword">import</span> DAG
<span class="token keyword">from</span> airflow<span class="token punctuation">.</span>operators<span class="token punctuation">.</span>bash <span class="token keyword">import</span> BashOperator
<span class="token keyword">from</span> airflow<span class="token punctuation">.</span>operators<span class="token punctuation">.</span>python <span class="token keyword">import</span> PythonOperator
<span class="token keyword">from</span> datetime <span class="token keyword">import</span> datetime
<span class="token keyword">from</span> pytz <span class="token keyword">import</span> timezone
<span class="token keyword">import</span> os 


local_tz <span class="token operator">=</span> timezone<span class="token punctuation">(</span><span class="token string">'Asia/Ho_Chi_Minh'</span><span class="token punctuation">)</span>


<span class="token comment"># Định nghĩa các hàm xử lý dữ liệu</span>
<span class="token keyword">def</span> <span class="token function">process_data</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">:</span>
    <span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">'process data'</span><span class="token punctuation">)</span>
    
<span class="token keyword">def</span> <span class="token function">save_data</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">:</span>
    <span class="token keyword">print</span><span class="token punctuation">(</span><span class="token string">'save data'</span><span class="token punctuation">)</span>
    <span class="token keyword">print</span><span class="token punctuation">(</span><span class="token punctuation">)</span>


<span class="token comment"># Định nghĩa DAG</span>
default_args <span class="token operator">=</span> <span class="token punctuation">{</span>
    <span class="token string">'owner'</span><span class="token punctuation">:</span> <span class="token string">'airflow'</span><span class="token punctuation">,</span>
    <span class="token string">'depends_on_past'</span><span class="token punctuation">:</span> <span class="token boolean">False</span><span class="token punctuation">,</span>
    <span class="token string">'start_date'</span><span class="token punctuation">:</span> datetime<span class="token punctuation">(</span><span class="token number">2023</span><span class="token punctuation">,</span> <span class="token number">6</span><span class="token punctuation">,</span> <span class="token number">7</span><span class="token punctuation">,</span> tzinfo<span class="token operator">=</span>local_tz<span class="token punctuation">)</span><span class="token punctuation">,</span>
    <span class="token string">'retries'</span><span class="token punctuation">:</span> <span class="token number">1</span><span class="token punctuation">,</span>
    <span class="token string">'retry_delay'</span><span class="token punctuation">:</span> timedelta<span class="token punctuation">(</span>minutes<span class="token operator">=</span><span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
<span class="token punctuation">}</span>

dag <span class="token operator">=</span> DAG<span class="token punctuation">(</span>
    dag_id<span class="token operator">=</span><span class="token string">'my_dag'</span><span class="token punctuation">,</span>
    default_args<span class="token operator">=</span>default_args<span class="token punctuation">,</span>
    description<span class="token operator">=</span><span class="token string">'A simple DAG'</span><span class="token punctuation">,</span>
    schedule_interval<span class="token operator">=</span><span class="token string">'* * * * *'</span>
<span class="token punctuation">)</span>

<span class="token comment"># Định nghĩa các Task</span>
task1 <span class="token operator">=</span> BashOperator<span class="token punctuation">(</span>
    task_id<span class="token operator">=</span><span class="token string">'task1'</span><span class="token punctuation">,</span>
    bash_command<span class="token operator">=</span><span class="token string">'echo "Task 1"'</span><span class="token punctuation">,</span>
    dag<span class="token operator">=</span>dag<span class="token punctuation">,</span>
<span class="token punctuation">)</span>

task2 <span class="token operator">=</span> PythonOperator<span class="token punctuation">(</span>
    task_id<span class="token operator">=</span><span class="token string">'task2'</span><span class="token punctuation">,</span>
    python_callable<span class="token operator">=</span>process_data<span class="token punctuation">,</span>
    dag<span class="token operator">=</span>dag<span class="token punctuation">,</span>
<span class="token punctuation">)</span>

task3 <span class="token operator">=</span> PythonOperator<span class="token punctuation">(</span>
    task_id<span class="token operator">=</span><span class="token string">'task3'</span><span class="token punctuation">,</span>
    python_callable<span class="token operator">=</span>save_data<span class="token punctuation">,</span>
    dag<span class="token operator">=</span>dag<span class="token punctuation">,</span>
<span class="token punctuation">)</span>

<span class="token comment"># Thiết lập phụ thuộc giữa các Task</span>
task1 <span class="token operator">>></span> task2 <span class="token operator">>></span> task3

<span class="token keyword">if</span> __name__ <span class="token operator">==</span> <span class="token string">"__main__"</span><span class="token punctuation">:</span>
    dag<span class="token punctuation">.</span>cli<span class="token punctuation">(</span><span class="token punctuation">)</span>

Trước khi giải thích các phần trong mã nguồn trên, sẽ có một số lưu ý như sau:

  • Tên file mã nguồn cần trung với dag_id
  • File mã nguồn cần được lưu trong thư mục dags của airflow, mặc định sẽ là AIRFLOW_HOME/dags
  • Mỗi khi tạo 1 file DAG và đưa vào thư mục dags cần khởi động lại airflow webserver

Chúng ta sẽ cùng phân tích từng thành phần có trong file mã nguồn trên

  • Định nghĩa DAG: đây là phần chúng ta sẽ định danh 1 số thông tin như dag_id để phân biệt các DAGs, người sở hữu, có phụ thuộc vào DAG nào đó hay không, ngày bắt đầu chạy, số lần thử lại nếu lỗi, thời gian thử lại nếu lỗi, mô tả DAG, thời gian tự lặp lại chạy DAG, …
<span class="token comment"># Định nghĩa DAG</span>
default_args <span class="token operator">=</span> <span class="token punctuation">{</span>
    <span class="token string">'owner'</span><span class="token punctuation">:</span> <span class="token string">'airflow'</span><span class="token punctuation">,</span>
    <span class="token string">'depends_on_past'</span><span class="token punctuation">:</span> <span class="token boolean">False</span><span class="token punctuation">,</span>
    <span class="token string">'start_date'</span><span class="token punctuation">:</span> datetime<span class="token punctuation">(</span><span class="token number">2023</span><span class="token punctuation">,</span> <span class="token number">6</span><span class="token punctuation">,</span> <span class="token number">7</span><span class="token punctuation">,</span> tzinfo<span class="token operator">=</span>local_tz<span class="token punctuation">)</span><span class="token punctuation">,</span>
    <span class="token string">'retries'</span><span class="token punctuation">:</span> <span class="token number">1</span><span class="token punctuation">,</span>
    <span class="token string">'retry_delay'</span><span class="token punctuation">:</span> timedelta<span class="token punctuation">(</span>minutes<span class="token operator">=</span><span class="token number">5</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
<span class="token punctuation">}</span>

dag <span class="token operator">=</span> DAG<span class="token punctuation">(</span>
    dag_id<span class="token operator">=</span><span class="token string">'my_dag'</span><span class="token punctuation">,</span>
    default_args<span class="token operator">=</span>default_args<span class="token punctuation">,</span>
    description<span class="token operator">=</span><span class="token string">'A simple DAG'</span><span class="token punctuation">,</span>
    schedule_interval<span class="token operator">=</span><span class="token string">'* * * * *'</span>
<span class="token punctuation">)</span>

Định nghĩa các Tasks có trong DAG:

Tùy vào các loại Task, chúng ta sẽ lựa chọn Operator hoặc Sensor sao cho phù hợp. Mình có giới thiệu các Operator và Sensor, các bạn có thể xem lại. Ví dụ mình dùng BashOperator, và lệnh thực thi của mình là echo "Task 1". Thông tin tối thiểu cần định nghĩa cho Task đó là task_id, dag và thực thi cái gì

<span class="token comment"># Định nghĩa các Task</span>
task1 <span class="token operator">=</span> BashOperator<span class="token punctuation">(</span>
    task_id<span class="token operator">=</span><span class="token string">'task1'</span><span class="token punctuation">,</span>
    bash_command<span class="token operator">=</span><span class="token string">'echo "Task 1"'</span><span class="token punctuation">,</span>
    dag<span class="token operator">=</span>dag<span class="token punctuation">,</span>
<span class="token punctuation">)</span>

task2 <span class="token operator">=</span> PythonOperator<span class="token punctuation">(</span>
    task_id<span class="token operator">=</span><span class="token string">'task2'</span><span class="token punctuation">,</span>
    python_callable<span class="token operator">=</span>process_data<span class="token punctuation">,</span>
    dag<span class="token operator">=</span>dag<span class="token punctuation">,</span>
<span class="token punctuation">)</span>

task3 <span class="token operator">=</span> PythonOperator<span class="token punctuation">(</span>
    task_id<span class="token operator">=</span><span class="token string">'task3'</span><span class="token punctuation">,</span>
    python_callable<span class="token operator">=</span>save_data<span class="token punctuation">,</span>
    dag<span class="token operator">=</span>dag<span class="token punctuation">,</span>
<span class="token punctuation">)</span>

Thiết lập phụ thuộc giữa các Tasks:

Dưới đây là những kiểu phụ thuộc thông dụng và ký hiệu tương ứng:

  • >>: Tác vụ bên trái phải được thực thi trước khi tác vụ bên phải được thực thi. Đây là kiểu phụ thuộc mặc định giữa các tác vụ trong Airflow.
  • <<: Tác vụ bên phải phải được thực thi trước khi tác vụ bên trái được thực thi.
  • >> và <<: Cả hai tác vụ phải được thực thi trước khi tác vụ được yêu cầu được thực thi.
  • >> với trigger_rule=’all_success’: Tất cả các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
  • >> với trigger_rule=’one_success’: Một trong các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
  • >> với trigger_rule=’all_failed’: Tất cả các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
  • >> với trigger_rule=’one_failed’: Một trong các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
  • Ví dụ về trigger_rule:
t1 <span class="token operator">>></span> t2 <span class="token operator">>></span> t3
t3<span class="token punctuation">.</span>trigger_rule <span class="token operator">=</span> <span class="token string">'all_failed'</span>

Chạy DAG:

Bạn có thể sử dụng dag.run() để chạy DAG của bạn trong một ứng dụng Python hoặc sử dụng dag.cli() để chạy các lệnh dòng lệnh Airflow từ DAG của bạn.

Vậy chúng ta cùng xem kết quả nào. Trước hêt nhớ những cái mình vừa note bên trên nhé, sau đó chúng ta sẽ thấy DAG của mình đã xuất hiện trên Webserver

Chúng ta sẽ chạy thử DAG của mình bằng nút gạt bên trái. Hiện tại code mình đang cho nó 1 phút chạy 1 lần và mỗi task sẽ in ra một dòng chữ. Trạng thái và số lượng những task theo từng trạng thái được hiển thị trong các ô tròn, các bạn trỏ chuột vào đó sẽ hiện ra những cái hint notes

Những thông tin in ra sẽ được lưu trữ trong log, mặc định ở thư mục AIRFLOW_HOME/logs