이 포스팅은 공부 목적으로 작성된 포스팅 입니다. 왜곡된 내용이 포함되어 있을 수 있습니다.
Airflow
airflow는 배치 기반 워크플로우를 개발, 스케줄링, 모니터링하기 위한 오픈소스 플랫폼이다. python 기반으로 작성된다.
airflow의 장점은 다음과 같다.
- 파이선 기반의 파이프라인을 구성할 수 있다. (데이터 처리는 파이썬이 발달되어 있다.)
- 데이터베이스와 클라우드와 통합이 가능하고 확장이 용이하다.
- 스케줄링을 활용하여 정기적으로 파이프라인을 실행할 수 있다.
- 증분처리(변경된 데이터에 대해서만 처리하는것)를 지원한다 (아직 이해하지 못함)
- 백필기능을 통한 데이터 재처리를 지원한다.(빅데이터 환경에서 발생하는 데이터가 제대로 처리 못하고 튀는 현상을 어느정도 보호해준다고 이해함)
airflow를 사용하기에 적합하지 않은 상황은 아래와 같다
- 스트리밍 데이터를 사용하는 경우
- 추가, 삭제 태스크가 많은 경우, 구조가 빈번하게 변경되는 파이프라인
글쓴이의 경우 대용량 데이터를 전처리하기 위해서 airflow를 도입했다. 학부 수업에 대용량 데이터를 전처리를 스크립트 코드로만 해본 적이 있었는데, 전처리의 특성상 여러 태스크를 진행해야하고(파싱, 크롤링..) 빅데이터의 경우 데이터가 엄격한 포맷에 지켜지지 않는 경우가 많기 때문에 스크립트 코드를 실행했을때 에러가 빈번하고, 에러를 만날때마다, 에러 처리를 해주는 식으로 직접 분기를 추가하였고, 아에 try catch와 같이 데이터를 먹어버리는 코드를 작성하기도 한다.
airflow를 사용하면 처리가 실패된 데이터를 확인할 수 있고, 보다 체계적으로 Dag 별로 태스크를 분리하여 데이터 전처리를 진행할 수 있다고 판단하여 도입하게 되었다.
https://airflow.apache.org/docs/apache-airflow/stable/index.html
https://www.yes24.com/Product/Goods/107878326
레퍼런스로는 Airflow 공식 문서와 "Apache Airflow 기반의 데이터 파이프라인"을 참조 하였다.
책의 경우 github로 예제 코드를 전부 제공하고 있어서 compose 만 실행하는 것을 실습을 해볼 수 있었다.
그래도 최대한 직접 코드를 만들여서 실습할 예정이다.
Dags 실행하기
airflow에서는 webserver, scheduler 두가지를 실행하는데 서로 다른 기능을 지원한다.
- webserver: 인터페이스 제공
- scheduler: 스케줄링, worker에게 task 할당
airflow standalone
위 command로 webserver와 scheduler를 모두 실행하고 있는데 최대한 지양할 것으로 공식문서에 안내되고 있다.
airflow scheduler
airflow webserver --port 8080
일일히 수동으로 실행하자!
이제 Dags를 직접 만들어 실행해보자
import textwrap
from datetime import datetime, timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
공식문서에 있는 예제 코드이다. 총 dag는 3개로
- print_data: 현재 날짜 출력
- sleep: sleep
- templated: (현재 날짜, 현재 날짜+7일) *5번 출력
이고, webserver를 통해 dags 실행 결과를 확인할 수 있다.
templated log에 실행 결과를 확인할 수 있다
이외에도 airflow를 사용할때 유용한 명령어가 있다.
# initialize the database tables
# airflow db 초기화
airflow db migrate
# print the list of active DAGs
# 모든 dags를 리스트
airflow dags list
# prints the list of tasks in the "tutorial" DAG
# 특정 dags의 task 들을 리스트
airflow tasks list tutorial
# 트리 형식으로 출력
# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
dags를 추가할때는 매번 초기화할 필요없이 dags 디렉토리에 추가하면 된다.
'AI' 카테고리의 다른 글
Airflow로 데이터 전처리 (1) (0) | 2024.07.19 |
---|---|
[논문 리뷰] Playing Atari with Deep Reinforcement Learning (0) | 2023.11.24 |