Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.
# 1. Slack Incoming Webhooks và Airflow Connection
Truy cập Slack App Directory tìm Incoming Webhooks: `https://<workspace>.slack.com/apps/A0F7XDUAZ-incoming-webhooks`

Ở mục **Post to Channel** chọn Channel, sau đó bấm **Add Incoming Webhooks integration**

Sau đó bạn sẽ nhận được 1 URL có dạng:
https://hooks.slack.com/services/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2

Vào **Airflow** > **Admin** > **Connections** để thêm một connection mới
- Conn Id: `Slack`
- Conn Type: `HTTP`
- Host: `https://hooks.slack.com/services`
- Password: `/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2`

# 2. Slack alert Utils
Tạo file utils chứa function alert, ví dụ: `/dags/utils/slack_alert.py`
```python
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
Task Failed.
*DAG*: {dag_id}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
dag_id=context.get('dag').dag_id,
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
```
# 3. Config Slack alert cho từng DAG
Với mỗi DAG muốn alert, ta thêm thuộc tính `on_failure_callback` cho mỗi DAG. Ví dụ như dưới dây:
`example_dag.py`
```py
from airflow import DAG
...
from utils.slack_alert import task_fail_slack_alert
default_args = {
**params['default_args'],
'owner': DAG_OWNER,
'on_failure_callback': task_fail_slack_alert,
...
}
dag = DAG('dag_id', default_args=default_args)
...
```
Kết quả:

# Tham khảo
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- airflow.operators.slack_operator: [https://airflow.apache.org/\_modules/airflow/operators/slack_operator.html](https://airflow.apache.org/_modules/airflow/operators/slack_operator.html)
Chúc các bạn thành công.