이 포스팅은 공부 목적으로 작성된 포스팅입니다. 왜곡된 내용이 포함되어 있을 수 있습니다
전처리 과정
수행하려는 데이터 전처리 과정은 다음과 같다.
- 서울 열린 데이터 광장에서 제공하는 서울시 일반음식점 인허가 정보를 데이터를 받는다(csv)
- 각각 음식점에 대해서 음식점 사진을 3개로 가져온다(3개 이하라면 최대로)
- 식당 메뉴 및 가격을 가져온다
- 식당 운영시간을 가져온다.
- 식당 좌표를 사용하는 지도 API에 맞게 수정한다.
csv 파일 분리
서울시 일반음식점 인허가 정보는 가지고 있는데 데이터로 해당 데이터를 여러개의 csv로 분할하는 task를 만들어보자
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import os
default_args = {
'owner': 'ji0513ji',
'depends_on_past': False,
'#start_date': days_ago(1),
"email": ["ji0513ji@naver.com"],
"email_on_failure": True,
'retries': 1,
}
dag = DAG(
'process_large_csv',
default_args=default_args,
description='큰 CSV 파일을 작은 파일로 나누고 전처리 작업을 수행하는 DAG',
schedule_interval=None,
tags=["ourMenu"],
)
def split_csv(file_path, chunk_size, output_dir):
# 큰 CSV 파일을 읽어온다
reader = pd.read_csv(file_path, chunksize=chunk_size)
for i, chunk in enumerate(reader):
chunk_file = f"{output_dir}/chunk_{i}.csv"
chunk.to_csv(chunk_file, index=False)
# 파일 나누기 작업
def split_file():
file_path = '../csv/서울시 일반음식점 인허가 정보.csv'
chunk_size = 10000 # 각 작은 파일의 행 개수
output_dir = '../csv/chunks'
os.makedirs(output_dir, exist_ok=True)
split_csv(file_path, chunk_size, output_dir)
split_task = PythonOperator(
task_id='split_csv_task',
python_callable=split_file,
dag=dag,
)
# 전처리 작업 (예: 각각의 작은 파일에 대해 수행할 작업 정의)
def preprocess_chunk(chunk_file):
df = pd.read_csv(chunk_file)
# 전처리 작업 수행 (예: 데이터 클렌징, 변환 등)
# df = perform_preprocessing(df)
output_file = chunk_file.replace('chunks', 'processed')
os.makedirs(os.path.dirname(output_file), exist_ok=True)
df.to_csv(output_file, index=False)
# 작업 의존성 설정
split_task
Dags 코드를 Dags 디렉토리에 생성한후 스케줄러를 껐다 키면 된다.
파이썬 코드를 실행할 필요없이 webserver에서 디버깅을 해주고 있었다.
새로운 dag가 생성된것을 확인할 수 있다. 현재 스케줄러 설정을 하지 않았기 때문에 수동으로 해당 dag를 실행해야한다. 실행해보자
자세히 보면 up_for_retry를 확인할 수 있다. dag 수행을 실패한 것으로 재시도를 해볼 수 있다(1회로 설정하였다.)
계속해서 에러가 나서 모든 데이터를 분할하지 못하는 현상이 발생하였다.
문제의 로그는 다음과 같았다.
ERROR:__main__:Error reading file: 'cp949' codec can't decode byte 0x82 in position 195717: illegal multibyte sequence
이전에 대용량 데이터를 다룰 때도 만났던 문제인데, 데이터가 중간중간 format를 지키지 않아 발생하는 문제로 알고 있다. 문제가 생겼을경우, 해당 라인은 무시하도록 코드를 다시 만들었는데 무슨 이유에서 인지 계속 동일한 에러가 발생하였다.
JSON
csv가 아닌 JSON파일로 변경하기로 하였다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import os
import json
default_args = {
'owner': 'ji0513ji',
'depends_on_past': False,
'#start_date': days_ago(1),
"email": ["ji0513ji@naver.com"],
"email_on_failure": True,
'retries': 1,
}
dag = DAG(
'process_large_json',
default_args=default_args,
description='큰 json 파일을 작은 파일로 나누고 전처리 작업을 수행하는 DAG',
schedule_interval=None,
tags=["ourMenu"],
)
def print_working_directory():
print(f"Current working directory: {os.getcwd()}")
print(f"Files in the current directory: {os.listdir()}")
print_working_directory_task = PythonOperator(
task_id='print_working_directory',
python_callable=print_working_directory,
dag=dag,
)
def split_json_file(source_file_path, output_dir, chunk_size):
# JSON 파일 열기
with open(source_file_path, 'r',encoding="UTF-8") as source_file:
data = json.load(source_file)
# JSON 파일의 DATA 배열에 접근
description = data.get("DESCRIPTION", {})
records = data.get("DATA", [])
# 출력 디렉토리 확인 및 생성
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 분할 작업
chunk = []
chunk_index = 0
for i, record in enumerate(records):
chunk.append(record)
if (i + 1) % chunk_size == 0:
chunk_data = {
"DESCRIPTION": description,
"DATA": chunk
}
chunk_file_path = os.path.join(output_dir, f'chunk_{chunk_index}.json')
with open(chunk_file_path, 'w',encoding="UTF-8") as chunk_file:
json.dump(chunk_data, chunk_file, indent=4, ensure_ascii=False)
chunk_index += 1
chunk = []
# 남아 있는 데이터가 있으면 마지막 청크 파일로 저장
if chunk:
chunk_data = {
"DESCRIPTION": description,
"DATA": chunk
}
chunk_file_path = os.path.join(output_dir, f'chunk_{chunk_index}.json')
with open(chunk_file_path, 'w') as chunk_file:
json.dump(chunk_data, chunk_file, indent=4, ensure_ascii=False)
split_task = PythonOperator(
task_id='split_json_task',
python_callable=split_json_file,
op_kwargs={
'source_file_path': 'airflow/csv/서울시 일반음식점 인허가 정보.json', # 대용량 JSON 파일 경로
'output_dir': 'airflow/csv/chunks', # 작은 JSON 파일이 저장될 디렉토리
'chunk_size': 1000, # 각 청크의 크기 (JSON 객체 수)
},
dag=dag,
)
# 작업 의존성 설정
print_working_directory_task >> split_task
json을 분할하는 dag로 코드를 변경하였다.
첫번째 시도는 path 설정을 잘못하여 실패했다.
두번째 시도로 dag 실행이 완료되었음을 확인할 수 있었다.
json이 잘 분할된 모습이다. 갯수도 50개 정도로 맞고, 육안으로 확인했을때, 깨지는 것 없이 잘 수행된 것으로 보였다.
이번 작업은 사실 일회성으로 작업하는 부분이라 airflow를 굳이 사용할 필요는 없었지만, 익숙해지자는 마음에서 사용해보았다. 다음에는 각각의 chunk에 대해서 크롤링을 진행하는 dag를 만들어보겠다. 각각 chunk에 대한 작업이 병렬적으로 수행되야 속도가 빠를텐데 해당 기능을 airflow에서 어떻게 제공하는지 확인해봐야겠다.
이전에 실패한 task가 다시 정상적으로 실행된 모습을 확인할 수 있었는데, airflow에서 실패한 경우 재시도를 하여 성공한것으로 아직 재시도를 어느 시점에 시도하는지는 모르겠다.
'AI' 카테고리의 다른 글
Airflow 시작하기 (0) | 2024.07.19 |
---|---|
[논문 리뷰] Playing Atari with Deep Reinforcement Learning (0) | 2023.11.24 |