Skip to content

Latest commit

 

History

History
489 lines (475 loc) · 54.9 KB

Apache Airflow 기반의 데이터 파이프라인.md

File metadata and controls

489 lines (475 loc) · 54.9 KB

서평

  • image
  • 이 책은 Apache Airflow의 개념과 적용 방법에 대한 설명뿐만 아니라, 실제 서비스 운영 시 고려해야 할 모니터링, 확장, 보안 등에 관한 내용에 관한 내용을 상세하게 안내하고 있으며 , 다양한 클라우드 환경에서 활용하는 방법까지 다루고 있습니다. 또한 Airflow 설치부터 파이프라인 작성, 테스트, 분석, 백필 그리고 배포 실습까지 한 번에 해결 할 수 있어 많은 도움이 되었습니다.
  • 책에서는 전반전익 Airflow에 대해 설명하고 있으며 기초, 중급, 실습으로3단계에 나눠서 소개하고 있습니다. 특히 클라우드에서 Airflow 관리에 대해서 AWS, GCP, Azure에 대해서 실습을 통해 더 자세하게 알 수 있었으며 운영하고 있는 Airflow에 대해서 시스템적으로 돌아보게 되는 계기가 되었습니다.
  • Airflow의 공식 홈페이지 문서에도 자세하게 설명을 하고 있지만 번역본으로 한번 전체적인 구조를 파악한 다음에 공부를 해도 좋을거같습니다. 효율적인 파이프라인의 구축이 중요하다는 것과 Airflow를 활용하여 파이프라인 스케줄링을 할 때 고려해야할 부분에 대해서 많이 배우게 되었습니다. 또한 효율적이고 유지보수 가능한 파이프라인을 구축하기 위한 몇 가지 사례를 통해서 경험을 할 수 있어서 좋았습니다.
  • Airflow 아키텍처에 대해서 더 자세하게 알게 되어서 좋았으며 익스큐터 유형에 따라 세부적인 내용에 대해 알 수 있게되어서 좋았습니다.SequentialExecutor, LocalExecutor,CeleryExecutor,KubernetesExecutor의 특징과 설정에 대해 배우며 운영환경에서 Airflow 관리에 대해 배웠습니다. 관련하여 효율적인 DAG를 작성하는데 있어서 모범 사례를 들어서 구축할때 고려해야할 부분에 대해서 배워서 좋았습니다.

기본편

Apache Airflow 살펴보기

  • Airflow는 파이썬으로 배치, 스케줄링, 모니터링 등을 한 번에 해결하는 워크플로 관리 플랫폼
  • Airflow의 주요 기능은 유연한 파이썬 프레임워크를 사용해 쉽게 데이터 파이프라인을 구축할 수 있게 해 주며, 최신 기술 환경에서 접하게 되는 서로 다른 기술들을 연결할 수있는 다양 빌딩 블록을 제공함
  • DAG에 정의된 특정 시점에 트리거할 수 있을 뿐만 아니라(Cron과 유사한) 최종 시점과 예상되는 다음 스케줄 주기를 상세하게 알려주는 것
  • 데이터 파이프라인 그래프
    • 태스크(task) 간의 의존성을 명확하게 확인하는 방법 중 하나는, 데이터 파이프라인을 그래프로 표현하는 것
    • 그래프는 방향성을 가지기 때문에 방향성 그래프(directed graph)라고 함
    • 방향성 비순환 그래프(Directed Acyclic Graph, DAG) : 그래프는 화살표 방향성의 끝점(directed edge)을 포함하되 반복이나 순환을 허용하지 않음(비순환 - acyclic)
    • Airflow(또는 기타 워크플로 관리자)에서는 DAG의 비순환 속성은 태스크 그래프를 효율적으로 해결하고 실행하기 위해 사용됨
    • 그래프 기반 표현은 전체 작업을 하나의 모놀로식(monolithic - 단일) 스크립트 또는 프로세스로 구성되는 것이 아니라 파이프라인을 작은 점진적인 태스크로 명확하게 분리
    • 그래프 기반 표현에서는 실패한 태스크(그리고 그 이후 태스크)만 재실행하면 되므로 효율적으로 구성할 수 있음
  • 백필(Backfilling)
    • 하나의 플로(Airflow에서는 DAG)를 특정 옵션(기간) 기준으로 다시 실행할 수 있는 기능, 태스크가 며칠 동안 실패하거나 새롭게 만든 플로를 과거의 특정 시점부터 순차적으로 실행하고 싶을 때 수행함
  • 파이프라인 스케줄링 및 실행
    • Airflow 스케줄러 : DAG를 분석하고 현재 시점에서 DAG의 스케줄이 지난 경우 Airflow 워커에 DAG의 태스크를 예약함
    • Airflow 워커 : 예약된 태스크를 선택하고 실행함
    • Airflow 웹 서버 : 스케줄러에서 분석한 DAG를 시각화하고 DAG 실행과 결과를 확인할 수 있는 주요 인터페이스를 제공함
  • Airlfow 웹 인터페이스
    • 개별 DAG의 태스크와 의존성에 대한 그래프 뷰(Graph View) 화면을 제공함
      • DAG의 태스크 내용과 태스크 간의 의존성을 보여줌
    • 트리 뷰(tree view) : DAG(가장 최근 실행과 실행 기록)의 다중 실행 결과를 확인할 수 있음
  • Airflow를 선택하는 이유
    • 파이썬 코드를 이용해 파이프라인을 구현할 수 있기 때문에 파이썬 언어에서 구현할 수 있는 대부분의 방법을 사용하여 복잡한 커스텀 파이프라인을 만들 수 있음
    • 파이썬 기반의 Airflow는 쉽게 확장 가능하고 다양한 시스템과 통합이 가능함
    • 수많은 스케줄링 기법은 파이프라인을 정기적으로 실행하고 점진적(증분, incremental) 처리를 통해, 전체 파이프라인을 재실행할 필요 없는 효율적인 파이프라인 구축이 가능함
    • 백필 기능을 사용하면 과거 데이터를 손쉽게 재처리할 수 있기 때문에 코드를 변경한 후 재생성이 필요한 데이터 재처리가 가능함
    • Airflow의 훌륭한 웹 인터페이스는 파이프라인 실행 결과를 모니터링할 수 있고 오류를 디버깅하기 위한 편리한 뷰를 제공함
    • 오픈 소스 기반의 특정 벤더에 종속되지 않고 Airflow를 사용할 수 있음
  • Airflow가 적합하지 않은 경우
    • 반복적이거나 배치 태스크(batch-oriented task)를 실행하는 기능에 초점이 맞춰져있기 때문에, 스트리밍(실시간데이터 처리) 워크플로 및 해당 파이프라인 처리에 적합하지 않을 수 있음
    • 추가 및 삭제 태스크가 빈번한 동적 파이프라인의 경우에는 적합하지 않을 수 있음
    • 파이썬 언어로 DAG를 구현하기 때문에 파이썬 프로그래밍 경험이 전혀(또는 거의0 없는 팀은 적합하지 않을 수 있음
    • 파이썬 코드로 DAG를 작성하는 것은 파이프라인 규모가 커지면 굉장히 복잡해질 수 있음
    • 워크플로 및 파이프라인 관리 플랫폼이며, 데이터 계보(lineage) 관리, 데이터 버전 관리와 같은 확장 기능은 제공하지 않기 때문에 필요한 경우 언급한 기능을 제공하는 특정 도구를 Airflow와 직접 통합해야함

Airflow DAG의 구조

  • 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG(Directed Acyclic Graph)로 형성할 수 음
  • 다중 태스크를 병렬로 실행할 수 있고 서로 다른 기술을 사용할 수 있음
  • 오른쪽 시프트 연산자(binary right shift operator) : "rshift"(>>)를 사용하여 태스크 간의 의존성을 정의함
  • 태스크와 오퍼레이터 차이점
    • 오퍼레이터(operator) : 단일 태스크를 나타냄
      • 단일 작업 수행 역할
      • PythonOperator : 파이썬 함수를 실행하는 데 사용됨
      • EmailOperator : 이메일 발송에 사용됨
      • Simple HttpOperator : HTTP 엔드포인트 호출
    • DAG는 오퍼레이터 집합에 대한 실행을 오케스트레이션(orchestration - 조정,조율)하는 역할을 함, 오퍼레이터의 시작과 정지, 오퍼레이터가 완료되면 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함됨
    • Airflow에서 태스크는 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 래퍼(wrapper) 또는 매니저(manager)로 생각해 볼 수 있음
    • 사용자는 오퍼레이터를 활용해 수행할 작업에 집중할 수 있으며, Airflow는 태스크를 통해 작업을 올바르게 실행할 수 있음
    • DAG와 오퍼레이터는 Airflow 사용자가 이용함, 태스크는 오퍼레이터의 상태를 관리하고 사용자에게 상태 변경(예:시작/완료)을 표시하는 Airflow의 내장 컴포넌트
  • Airflow는 스케줄러, 웹 서버, 데이터베이스의 세 가지 핵심 컴포넌트로 구성됨
  • 도커 컨테이너 : 파이썬 패키지를 설정하고 라이브러리의 충돌을 방지할 수 있도록 재사용 격리 환경을 생성해 줄 수 있는 인기 있는 도구
  • Airflow는 모든 태스크의 로그를 수집하기 때문에 결과나 실패에 대한 문제를 확인할 수 있음
  • 실패 : 외부 서비스 중단, 네트워크 연결 문제, 디스크 손상 등 종종 발생

Airflow의 스케줄링

  • Cron 기반의 스케줄 간격 설정하기
    • 더 복잡한 스케줄 간격 설정을 지원하기 위해서 cron(macOS 및 리눅스와 같은 유닉스 기반 OS에서 사용하는 시간 기반 작업 스케줄러)과 동일한 구문을 사용해 스케줄러 간격을 정의함
    • *****(분,시간,일,월,요일 )
    • 0 * * * * : 매시간(정시에 실행)
    • 0 0 * * * : 매일(자정에 실행)
    • 0 0 * * 0 : 매주(일요일 자정에 실행)
    • 0 0 1 * * : 매월 1일 자정
    • 45 23 * * SAT : 매주 토요일 23시 45분
    • 0 0 * * MON-FRI = 매주 월요일부터 금요일 자정에 실행
    • 0 0,12 * * * : 매일 자정 및 오후 12시에 실행
  • 빈도 기반의 스케줄 간격 설정하기
    • timedelta(표준 라이브러리인 datatime 모듈에 포함된) 인스터스를 사용하면 됨
  • 실행 날짜를 사용하여 동적 시간 참조하기
    • 시간 기반 프로세스(time-based process) 워크플로의 경우, 주어진 작업이 실행되는 시간 간격을 아는 것이 중요함
    • execution_date : DAG가 실행되는 날짜와 시간을 나타냄
      • DAG를 시작하는 시간의 특정 날짜가 아니라 스케줄 간격으로 실행되는 시작 시간을 나타내는 타임스탬프
      • 스케줄 간격의 종료 시간은 next_execution_date라는 매개변수를 사용
      • 과거의 스케줄 간격의 시작을 정의하는 previous_execution_date 매개변수를 제공
  • 파티셔닝(partitioning) : 데이터 세트를 더 작고 관리하기 쉬운 조각으로 나누는 작업
    • 데이터 세트의 작은 부분을 파티션(partitions)이라 함
  • 고정된 스케줄 간격으로 태스크 실행
    • 시작 날짜, 스케줄 간격 및 종료 날짜(선택 사항)의 세 가지 매개 변수를 사용하여 DAG를 실행하는 시점을 제어할 수 있음
  • 백필(backfilling) : 과거 데이터 세트를 로드하거나 분석하기 위해 DAG의 과거 기록을 실행함
  • 태스크 디자인
    • 원자성(atomicity)
      • 원자성 트랜잭션은 모두 발생하거나 전혀 발생하지 않는, 나눌 수 없고 돌이킬 수 없는 일련의 데이터베이스와 같은 작업으로 간주됨
      • Airflow의 태스크는 성공적으로 수행하여 적절한 결과를 생성하거나 시스템 상태에 영향을 미치지 않고 실패하도록 정의함
      • 모든 것이 완료되거나 완료되지 않도록 보장함
    • 멱등성(idempotency)
      • 동일한 태스크를 여러 번 호출해도 결과에 효력이 없어야 함
      • 입력 변경 없이 태스크를 다시 실행해도 전체 결과가 변경되지 않아야 함
      • 멱등성이 보장되는 태스크는 실행 횟수에 관계없이 동일한 결과를 생성함, 일관성과 장애 처리를 보장함

Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기

  • 오퍼레이터의 인수 템플릿 작업
    • BashOperator : 실행할 배시(Bash) 명령을 제공하는 인수인 bash_command를 사용함
  • Airflow는 날짜 시간에 Pendulum 라이브러리를 사용하며 execution_date는 이러한 Pendulum의 datetime 객체
    • 네이티브 파이썬의 datetime의 호환(drop-in replacement) 객체이므로 파이썬에 적용할 수 있는 모든 메서드를 Pendulum에도 적용할 수 있음
  • PythonOperator 템플릿
    • BashOperator를 사용하여 런타임에 자동으로 템플릿이 지정되는 bash_command 인수(또는 다른 오퍼레이터에서 이름이 지정된 인수)에 문자열을 제공함
    • PythonOperator는 런타임 콘텍스트로 템플릿화할 수 있는 인수를 사용치 않고 별도로 런타임 콘텍스트를 적용할 수 있는 python_callable 인수를 사용함
  • 다른 시스템과 연결하기
    • 태스크 간 데이터를 전달하는 방법
      • Airflow 메타스토어를 사용하여 태스크 간 결과를 쓰고 읽음, XCom
      • 영구적인 위치(예 : 디스크 또는 데이터베이스)에 태스크 결과를 기록함
    • Airflow 태스크는 설정에 따라 물리적으로 서로 다른 컴퓨터에서 독립적으로 실행되므로 메모리에서 데이터를 공유할 수 없음, 태스크 간의 데이터는 태스크가 완료된 후 다른 태스크에서 읽을 수 있는 다른 위치에 유지되어야 함
    • XCom이라는 기본 메커니즘을 제공하여 Airflow 메타스토어에서 선택 가능한(picklable) 개체를 저장하고 나중에 읽을 수 있음
      • 피클(Pickle)은 파이썬의 직렬화 프로토콜이며 직렬화는 메모리의 개체를 나중에 다시 읽을 수 있도록 디스크에 저장할 수 있는 형식으로 변환하는 것을 의미함
      • 기본 파이썬 타입(예: string, int, dict, list)에서 빌드된 모든 객체를 피클링이 가능함
      • 피클링이 불가능한 개체 : 데이터베이스 연결, 파일 핸들러
    • PostgresOperator : Postgres와 통신하기 위해 훅(hook)이라고 불리는 것을 인스턴스화함, 인스턴스화된 훅은 연결 생성, Postgres에 쿼리를 전송하고 연결에 대한 종료 작업을 처리함, 오퍼레이터는 사용자의 요청을 훅으로 전달하는 작업만 담당함

태스크 간 의존성 정의하기

  • XCom : DAG 실행에서 서로 다른 작업 간에 데이터를 전달할 수 있음
  • 다양한 태스크 의존성 패턴
    • 태스크의 선형 체인(linear chain) 유형 : 연속적으로 실행되는 작업
    • 팬아웃/팬인(fan-out/fan-in) 유형 : 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 그 반대의 동작을 수행하는 유형
      • 팬아웃 : 여러 개의 입력 태스크 연결수 제한
      • 팬아웃 종속성 : 한 태스크를 여러 다운스트림 태스크에 연결하는 것
      • 팬인 구조 : 하나의 태스크가 여러 업스트림 태스크에 영향을 받는 구조는 단일 다운스트림 태스크가 여러 업스트림 태스크에 의존성을 갖음
        • [a , b ] >> c
  • 브랜치하기
    • Airflow는 다운스트림 태스크 세트 중 선택할 수 있는 기능을 BranchPythonOperator을 통해 제공
      • PythonOperator와 달리 전달된 호출 가능한 인수는 작업 결과로 다운슽릠 태스크의 ID를 반환함, 반환된 ID는 브랜치 태스크 완료 후 실행할 다운스트림 태스크를 결정함, 태스크 ID 리스트를 반환하는 경우도 있으며, 이 경우 Airflow는 참조된 모든 태스크를 실행함
  • 트리거 규칙
    • 트리거 규칙 : 태스크의 의존성 기능(DAG안에서 선행 태스크 조건)과 같이 Airflow가 태스크가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건, Airflow의 기본 트리거 규칙은 all_success, 태스크를 실행하려면 모든 의존적인 태스크가 모두 성공적으로 완려되어야 함
  • 태스크 간 데이터 공유
    • Airflow의 XCom을 사용하여 태스크 간에 작은 데이터를 공유할 수 있음
    • 태스크 간에 메시지를 교환하여 특정 상태를 공유함
    • XCom 사용 시 고려사항
      • 풀링 태스크는 필요한 값을 사용하기 위해 태스크 간에 묵시적인 의존성(implicit dependency)이 필요함
      • 명시적 의존성 태스크(explicit task)와 달리 DAG에 표시되지 않으며 태스크 스케줄 시에 고려되지 않음
      • XCom에 의해 의존성 있는 작업이 올바른 순서로 실행할 수 있도록 해야함
      • XCom은 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 가능성이 있음
      • XCom이 저장하는 모든 값을 직렬화(serialized)를 지원해야 한다는 기술적 한계가 존재함, 람다 또는 여러 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형은 XCom에 저장할 수 없음
  • Taskflow API를 통해 파이썬 태스크 및 의존성을 정의하기 위한 새로운 데코레이터 기반(decorator-based) API를 추가적으로 제공함

중급

워크플로 트리거

  • 시간 간격은 문자열(@daily), timedelta 객체(timedelta(days=3)) , cron 문자열(30 14 * * *)로 지정
  • 센서를 사용한 폴링 조건
    • Airflow 오퍼레이터의 특수 타입(서브 클래스)인 센서(sensor)의 도움을 받을 수 있음
    • 센서는 특정 조건이 true인지 지속적으로 확인하고 true라면 성공, 만약 false인 경우 센서는 상태가 true가 될 때까지 또는 타임아웃이 될 때까지 계속 확인함
    • FileSensor : 파일위치에 파일이 존재하는지 확인하고 파일이 있으면 true를 반환하고, 그렇지 않으면 false를 반환한 후 해당 센서는 지정된 시간(기본값은 60초) 동안 대기했다가 다시 시도함
    • Poking : 센서를 실행하고 센서 상태를 확인하기 위해 Airflow에서 사용하는 이름
    • 사용자 지정 조건 폴링
      • PythonSensor
        • PythonOperator와 유사하게 파이썬 콜러블(callable 함수,메서드 등)을 지원
        • PythonSensor 콜러블은 성공적으로 조건이 충족됐을 경우 true를, 실패했을 경우 false로 부울(Boolean) 값을 반환하는 것으로 제한됨
    • 센서 데드록 : 실해중인 태스크 조건이 true가 될 때까지 다른 태스크가 대기하게 되므로 모든 슬롯이 데드록 상태가 됨
    • TriggerDagRunOperator : 워크플로가 분리된 경우 이 오퍼레이터를 통해 다른 DAG를 트리거할 수 있음
      • DAG에서 태스크를 샂게하면 이전에 트리거된 해당 DAG 실행을 지우는 대신에 새 DAG 실행이 트리거됨
  • 각 DAG 실행은 run_id 필드 존재
    • schedule__ : 스케줄되어 DAG 실행이 시작되었음을 나타냄
    • backfill__ : 백필 태스크에 의해 DAG 실행이 시작되었음을 나타냄
    • manula__ : 수동으로 DAG 실행이 시작되었음을 나타냄
  • 다른 DAG의 상태를 폴링하기
    • ExternalTaskSensor : 다른 DAG의 태스크를 지정하여 해당 태스크의 상태를 확인하는 것

외부 시스템과 통신하기

  • 클라우드 서비스에 연결하기
    • AWS 클라이언트의 이름은 boto3, GCP 클라이언트의 이름은 Cloud SDK, Azure SDK for Python 이러한 클라이언트는 요청에 필요한 세부 정보를 입력하면 클라이언트가 요청 및 응답 처리를 내부적으로 처리하는 편리한 기능을 제공함
  • S3CopyObjectOperator : 소스 및 타겟의 버킷 이름과 오브젝트 이름을 넣고 요청하면 선택한 오브젝트를 대신 복사함
  • AWS에서 인터넷에 액세스하기 위해서는 SageMaker 엔드포인트를 트리거하는 Lambda를 개발 및 배포하고 API Gateway를 생성 및 연결해 외부에서 접속할 수 있는 HTTP 엔드포인트를 만듬
    • 인프라의 배포를 코드로 구성해 배포하지 않는 이유는 Lambda 및 API Gateway를 주기적으로 배포하지 않고 한 번만 배포하면 되기 때문

시스템 간 데이터 이동하기

  • 대규모 데이터 처리 작업에서는 도커 컨테이너를 스파크 작업으로 대체하면 여러 시스템에서 분산 처리가 가능함
  • Airflow가 구동중인 시스템의 모든 리소스를 사용하는 매우 큰 작업을 가지고 있다고 가정해 보면 작업을 다른 곳에서 수행하고 Airflow는 작업이 시작되고 완료될때까지 대기하는 것이 더 좋음, 오케스트레이션과 실행이 완벽하게 분리되어 있어야 하며, Airflow에 의해서 작업은 시작되지만 Apache Spark 같은 데이터 처리 프레임워크가 실제 작업을 수행하고 완료될 때까지 기다림
    • SparkSubmintOperator : Airflow 머신에서 Spark 인스턴스를 찾기 위해 spark-submit 파일과 YARN 클라이언트 구성이 필요함
    • SSHOperator : Spark 인스턴스에 대한 SSH 액세스가 필요하지만 Airflow 인스턴스에 대한 Spark 클라이언트 구성이 별도로 필요하지 않음
    • SimpleHTTPOperator : Apache Spark용 REST API인 Livy를 실행

커스텀 컴포넌트 빌드

  • 커스텀 훅을 사용하여 Airflow가 지원하지 않는 시스템과 연동할 수 있음
  • 개별 워크플로에 특화되거나 Airflow 기본 내장 오퍼레이터로 처리할 수 없는 태스크를 수행하기 위해 커스텀 오퍼레이털르 만들어 사용할 수 있음
  • 커스텀 센서를 사용하여 특정 (외부) 이벤트가 발생할 때까지 대기하는 컴포넌트를 구현할 수 있음
  • 커스텀 오퍼레이터, 훅, 센서 등의 코드들은 (배포가능한) 파이썬 라이브러리로 구현하여 보다 구조적으로 만들 수 있음

테스트하기

  • GitLab, Bitbucket, CircleCI, Travis CI 등과 같이 널리 사용되는 모든 CI/CD 시스템은 프로젝트 디렉터리의 루트에 YAML 형식으로 파이프라인을 정의하여 작동함
  • 코드 스니펫(code snippet) : 재사용 가능한 작은 소스코드(코드 조각)
  • YAML : 또 다른 마크업 언어(YAML Another Markup Language), YAML의 핵심은 문서 마크업이 아닌 데이터 중심에 있다는 것을 보여주기 위해 이름을 바꾸었음, 오늘날 XML과 JSON이 데이터 직렬화에 주로 쓰이기 시작하면서, 많은 사람들이 YAML을 가벼운 마크업 언어로 사용하고 있음
  • 모든 DAG에 대한 무결성 테스트(integrity test)
    • 모든 DAG의 무결성(예: DAG에 사이클이 포함되어 있지 않은지 확인, DAG의 태스크 ID가 고유한 경우 등)에 대해 DAG가 정상적으로 구현되었는지 확인
      • assertion
        • assert dag_object로 파일에서 DAG객체를 성공적으로 찾았는지 확인함
        • assertion을 추가하여 /dags 경로에 있는 모든 파이썬 파일에 DAG 객체가 적어도 하나 이상 포함되어 있는지에 대한 유효성 검사를 함
      • for dag in dag_objects: dag.test_cycle()는 DAG 객체에 순환 주기 존재 여부를 확인함
  • CI/CD 파이프라인 설정하기
    • 코드 저장소를 통해 코드가 변경할 때 사전 정의된 스크립트를 실행하는 시스템
    • 지속적 통합(continous integration,CI)
      • 변경된 코드가 코딩 표준과 테스트 조건을 준수하는지 확인하고 검증하는 것을 의미함
      • 코드를 리포지터리에 푸시할때 Flake8, Pylint 및 Black과 같은 코드 검사기를 통해 코드의 품질을 확인하며 일련의 테스트를 실행함
    • 지속적 배포(continuous deployment, CD)
      • 사람의 간섭 없이 완전히 자동화된 코드를 프로덕션 시스템에 자동으로 배포하는 것을 말함
      • 수동으로 검증하고 배포하지 않고도 코딩 생산성을 극대화하는 것
  • 목업(mocking) : 특정 작업이나 객체를 모조로 만드는것

컨테이너에서 태스크 실행하기

  • 복잡하며 종속성이 충돌하는 환경
    • 다양한 오퍼레이터를 사용할 때 또 다른 어려운 점은 오퍼레이터마다 각각의 종속성(파이썬이나 그 외)을 요구하는 것
    • HttpOperator : HTTP 요청을 수행하기 위해 파이썬 라이브러리인 request에 종속적
    • MySQLOperator : MySQL과 통신하기 위해 파이썬 및 시스템 레벨에서 종속성을 갖게 됨
    • PythonOperator : 호출하는 추천코드는 그 자체적으로 많은 종석송(머신러닝이 포함된 경우 pandas, scikit-learn, 등)을 가질 수 있음
    • Airflow 설정 방식 때문에 모든 종속성을 Airflow 스케줄러를 실행하는 환경뿐만 아니라 Airflow 워커 자체에도 설치되어야 함
    • 다양한 오퍼레이터를 사용할 때는 다양한 종속성을 위한 많은 모듈이 설치되어야 하기 때문에 잠재적인 충돌이 발생하고 환경 설정 및 유지 관리가 상당히 복잡해짐(많은 패키지를 설치하면 잠재적인 보안 위험은 말할 것도 없이 높아짐), 파이썬은 동일한 환경에 동일한 패키지의 여러 버전을 설치할 수 없기 때문에 문제가 됨
  • 컨테이너(container)
    • 애플리케이션에 필요한 종속성을 포함하고 서로 다른 환경에 균일하게 쉽게 배포할 수 있는 기술
    • 배포 : 애플리케이션이 대상 시스템에서 올바르고 안정적으로 실행될 수 있도록 보장
      • 배포할 때에는 일반적으로 운영 체제 간의 차이, 설치된 종속성 및 라이브러리의 변형, 하드웨어의 차이 등을 포함하여 다양한 요소를 조정 및 고려해야함
    • 복잡성 관리하는 한 가지 방법은 가상화를 사용하는것
      • 가상화 : 클라이언트 운영 체제 위에서 실행되는 가상 머신(Virtual Machine,VM)에 애플리케이션을 설치하는 것
      • VM의 단점은 호스트 OS위에서 OS(가상 또는 게스트 OS) 전체를 실행해야 하기 때문에 상당히 무겁다는 것, 모든 새 VM은 자체 게스트 OS를 실행하므로, 단일 시스템에서 여러 개의 VM 애플리케이션을 실행하려면 매우 큰 리소스가 필요함
    • 사용 이유
      • 간편한 종속성
      • 다양한 태스크 실행 시에도 동일한 접근 방식을 제공
      • 향상된 테스트 가능성
  • 쿠버네티스에서 태스크 실행
    • 도커는 컨테이너화된 태스크를 단일 시스템에서 실행할 수 있는 편리한 접근 방식을 제공함
      • 여러 시스템에서 태스크를 조정하고 분산하는 데는 도움이 되지 않기 때문에 접근 방식의 확장성이 제한됨
      • 쿠버네티스와 같은 컨테이너 오케스트레이션 시스템이 개발되어 컴퓨터 클러스터 전반에 걸쳐 컨테이너화된 애플리케이션을 확장할 수 있게 되었음
    • 쿠버네티스
      • 컨테이너화된 애플리케이션의 배포, 확장 및 관리에 초점을 맞춘 오픈 소스 컨테이너 오케스트레이션 플랫폼
      • 쿠버네티스는 도커에 비해 컨테이너를 여러 작업 노드에 배치를 관리하여 확장할 수 있도록 지원하는 동시에 스케줄링 시 필요한 리소스(CPU 또는 메모리), 스토리지 및 특수한 하드웨어 요구사항(GPU 액세스) 등을 고려함
      • 쿠버네티스 마스터(또는 컨트롤 플레인)
        • API 서버, 스케줄러 및 배포, 스토리지 등을 관리하는 기타 서비스를 포함하여 다양한 컴포넌트를 실행함
        • 쿠버네티스 API 서버는 kubectl(쿠버네티스의 기본 CLI 인퍼테이스) 또는 쿠버네티스 파이썬 SDK와 같은 클라이언트에서 쿠버네티스를 쿼리하고 명령을 실행하여 컨테이너를 배포하는데 사용함
        • 쿠버네티스 마스터는 쿠버네티스 클러스터에서 컨테이너화된 애플리케이션을 관리하는 주요 포인트
      • 노드
        • 쿠버네티스 워커 노드는 스케줄러가 할당한 컨테이너 애플리케이션을 실행하는 역할을 함
        • 파드(Pod)
          • 단일 시스템에서 함께 실행해야 하는 컨테이너가 하나 이상 포함됨
          • 쿠버네티스의 가장 작은 단위
      • 쿠버네티스는 보안 및 스토리지 관리를 위한 내장된 기본 기능을 제공함, 쿠버네티스 마스터에게 스토리지 볼륨을 요청하고 이를 컨테이너 내부에 영구 스토리지로 마운트할 수 있음
  • 쿠버네티스 관련 문제 진단하기
    • 문제가 발생하면 태스크가 올바르게 끝나지 않고 실행 상태로 멈추는 경우를 확인
      • 쿠버네티스가 태스크 파드를 스케줄할 수 없기 때문에 발생함
      • 파드가 클러스터 내에서 실행되지 않고 보류(pending) 중인 상태
  • 도커 기반 워크플로와 차이점
    • 태스크 컨테이너가 더 이상 Airflow 워커 노드에서 실행되지 않고 쿠버네티스 클러스터 내에 별도의 노드에서 실행된다는 것
    • 워커에 사용되는 모든 리소스는 최소화되며, 쿠버네티스의 기능을 사용하여 적절한 리소스(CPU,메모리,GPU)가 있는 노드에 태스크가 배포되었는지 확인할 수 있음
    • 어떤 스토리지도 더 이상 Airflow 워커가 접근하지 않지만, 쿠버네티스 파드에서는 사용할 수 있어야 함
    • 쿠버네티스는 도커에 비해 확장성, 유연성(다양한 워크로드에 대해 서로 다른 리소스/노드 제공) 및 스토리지, 보안 등과 같은 기타 리소스 관리 관점에서 상당한 장점을 가지고 있음, Airflow 전체를 쿠버네티스에서 실행할 수 있음, Airflow 전체를 확장 가능한 컨테이너 기반 인프라에서 구동 설정이 가능함

실습

모범 사례

  • 깔끔한 DAG 작성
    • PEP8 스타일 가이드 및 구글과 같은 회사의 가이드
    • 들여쓰기, 최대 줄 길이, 변수/클래스/함수 이름 지정 스타일 등에 대한 권장 사항이 포함되어 있음
    • 정적 검사기를 이용한 코드 품질 확인
      • pylint 및 flake8로, 둘 다 정적 코드 검사기로 작동함
      • 코드가 정해진 기준을 얼마나 잘 지키는지를 확인해줌
      • pylint가 flake8보다 광범위한 검사 조건을 가지고 있는 것으로 간주함
    • 코드 포맷터를 사용하여 공통 포맷 적용
      • 조직 내에서 코딩 스타일의 이질성을 줄이는 방법 중 하나는 코드 포맷터 도구를 사용하여 코드를 형식화하는 것
      • 개발자가 코드를 작성한 이후, 코드 포맷터로 다시 정해진 규칙에 따라 작성된 코드에 대하여 일괄적인 수정함
      • 두 가지 파이썬 코드 포맷터는 YAPF 와 Black이 대표적
  • factory 함수를 사용한 공통 패턴 생성
    • 공통 DAG 구조를 생성하는 프로세스의 속도를 높이고 효율적인 방법 중 하나는 factory 함수를 작성하는 것
    • 각 단계에 필요한 구성을 가져와서 해당 DAG나 태스크 세트를 생성하는 것(공장처럼 DAG나 태스크 세트를 생산함)
  • 태스크 그룹을 사용하여 관련된 태스크들의 그룹 만들기
    • Airflow2에서는 태스크 그룹(task group)이라는 새로운 기능이 도입됨
    • 태스크 세트를 더 작은 그룹으로 효과적으로 그룹화하여(시각적으로), DAG 구조를 보다 쉽게 관리하고 이해할 수 있음
    • TaskGroup 콘텍스트 관리자를 사용하여 태스크 그룹을 생성할 수 있음
  • 재현 가능한 태스크 설계
    • 태스크를 재현 가능하도록 설계, 태스크를 다른 시점에서 실행할 때에도, 태스크를 간단하게 다시 실행하고 동일한 결과를 기대할 수 있어야함
    • 태스크는 항상 멱등성을 가져야 함
      • 동일한 태스크를 여러 번 다시 실행하더라도 그 결과는 항상 동일해야함
    • 태스크 결과는 결정적이어야 함
      • 태스크는 결정적(deterministic)일 때만 재현할 수 있음, 태스크는 주어진 입력에 대해 항상 동일한 출력을 반환해야 함
  • 증분 적재 및 처리
    • 증분 처리의 기본 아이디어는 데이터를 파티션으로 분할하고, 이러한 파티션을 각 DAG 실행에서 개별적으로 처리하는 것
    • 각 DAG 실행에서 처리되는 데이터의 양을 해당 파티션의 크기로 제한할 수 있음
    • 증분 처리 설계 기반 프로세스의 가장 큰 장점은 실행 중 일부 과정이 오류로 인해 전체 과정이 중단된다고 하더라도 모든 데이터 세트에 대해 처음부터 다시 실행할 필요가 없다는 점, 실패한 부분만 다시 실행해주면 됨
  • 자원 관리
    • Pool을 이용한 동시성 관리하기
      • Airflow는 리소스 풀을 사용하여 주어진 리소스에 액세스할 수 있는 태스크 수를 제어할 수 있음, 리소스 풀의 각 풀은 해당 리소스에 대한 액세스 권한을 부여하는 고정된 수의 슬롯(slot)을 가지고 있음
    • SLA 및 경고를 사용하여 장기 실행 작업 탐지
      • Airflow를 사용하면 SLA(service-level agreement - 서비스 수준 계약) 메커니즘을 사용하여 태스크의 동작을 모니터링할 수 있음, DAG 또는 태스크 SLA 제한 시간을 효과적으로 지정함

운영환경에서 Airflow 관리

  • Airflow 아키텍처
    • 웹 서버
      • 웹 서버는 파이프라인이 현재 상태에 대한 정보를 시각적으로 표시하고 사용자가 DAG 트리거와 같은 특정 태스크를 수행할 수 있도록 관리하는 역할을 수행함
    • 스케줄러
      • DAG 파일 구문 분석, 즉 DAG 파일 읽기, 비트 및 조각 추출, 메타 스토어에 저장
      • 실행할 태스크를 결정하고 이러한 태스크를 대기열에 배치
    • 데이터베이스
    • Airflow에서는 익스큐터 유형에 따라 다양한 설치 환경을 구성할 수 있음
      • 익스큐터 - 분산환경 - 설치 난이도 = 사용에 적합한 환경
        • SequentialExcutor(기본값) - 불가능 - 매우 쉬움 - 시연 / 테스트
        • LocalExcutor - 불가능 - 쉬움 - 단일 호스트 환경 권장
        • CeleryExecutor - 가능 - 보통 - 멀티 호스트 확장 고려시
        • KubernetesExecutor - 불가능 - 어려움 - 쿠버네티스 기반 컨테이너 환경 구성 고려시
  • 어떤 익스큐터가 적합한가?
    • 워크로드를 여러 머신에 분산하려는 경우 CeleryExecutor 및 KubernetesExecutor의 두 가지 옵션 존재, 단일 시스템의 리소스 제한에 도달하거나 여러 시스템에서 태스크를 실행하여 병렬 실행을 원하거나 태스크를 여러 시스템에 분산하여 작업 속도를 더 빠르게 실행하고자 할 때 사용할 수 있음
    • SequentialExecutor
      • Airflow 익스큐터 중 가장 단순하게 구성할 수 있는 방법이자, Airflow를 별도의 설정이나 환경 구성 없이 바로 실행시킬 수 있는 방법
      • 태스크를 순차적으로 하도록(한 번에 하나씩) 구성 되어 있음
      • 주로 테스트 및 데모 목적으로 사용되는 쪽으로 많이 선호함
      • 작업 처리 속도가 상대적으로 느리며 단일 호스트 환경에서만 작동함
    • LocalExecutor
      • 한 번에 하나의 태스크로 제한되지 않고 여러 태스크로 병렬로 실행할 수 있음
      • 익스큐터 내부적으로 워커 프로세스가 FIFO(First in, First out) 적용 방식을 통해 대기열에서 실행할 태스크를 등록함
      • 기본적으로 최대 32개의 병렬 프로세스를 실행함
    • CeleryExecutor
      • 내부적으로 Celery를 이용하여 실행할 태스크들에 대해 대기열을 등록함
      • 워커가 대기열에 등록된 태스크를 읽어와 개별적으로 처리함
      • 사용자 관점에서 볼 때 태스크를 대기열로 보내고 워커가 대기열에서 처리할 태스크를 개별적으로 읽어와 처리하는 과정은 LocalExecutor와 유사함
      • LocalExecutor와 가장 큰 차이점은 모든 구성요소가 서로 다른 호스트에서 실행되기 때문에 작업 자체에 대한 부하가 LocalExecutor에 비해 낮음
      • Celery는 대기열 메커니즘(Celery에서 처리할때는 Broker라고 지칭)을 위해 RabbitMQ,Redis 또는 AWS SQS를 지원함
      • Celery의 모니터링을위해 Flower라는 모니터링 도구를 함께 제공함
      • Celery는 파이썬 라이브러리 형태로 제공되므로 Airflow 환경에 적용하기 편리함
    • KubernetesExecutor
      • 쿠버네티스에서 워크로드를 실행함
      • Airflow를 실행하려면 쿠버네티스 클러스터의 설정 및 구성이 필요하며 익스큐터는 Airflow 태스크를 배포하기 위해 쿠버네티스 API와 통합됨
      • 쿠버네티스는 컨테이너화된 워크로드를 실행하기 위한 사실상의 표준 솔루션
  • Airflow를 위한 메타스토어 설정
    • 메타스토어(metastore) : Airflow에서 일어나는 모든 일은 데이터베이스에 등록되며 이를 Airflow에서 칭함
    • 워크플로 스크립트 : 스케줄러를 통해 작업 내역을 분석 및 관리하는 역할을 수행하며 메타스토어에 그 해석된 내용을 저장하는 등의 여러 컴포넌트로 구성되어 있음
    • Airflow는 Python ORM(Object Relational Mapper) 프레임워크인SQLAlchemy를 사용하여 모든 데이터베이스 태스크를 수행하며 SQL 쿼리를 수동으로 작성하는 대신, 직접 데이터베이스에 직접 편리하게 작성할 수 있음
  • 스케줄러의 여러가지 역할
    • DAG 파일을 구문 분석하고 추출된 정보를 데이터베이스에 저장
    • 실행할 준비가 된 태스크를 결정하고 이를 대기 상태로 전환
    • 대기 상태에서 태스크 가져오기 및 실행
    • SchedulerJob의 역할
      • DAG 파일을 파싱하고 추출된 정보를 데이터베이스에 저장하는 역할을 수행함
      • DAG 프로세서
        • Airflow 스케줄러는 DAG 디렉터리(AIRFLOW__CORE__DAGS__FOLDER에서 설정한 디렉터리)의 파이썬 파일을 주기적으로 처리함
      • 태스크 스케줄러
        • 스케줄러는 실행할 태스크 인스턴스를 결정하는 역할을 함
  • 익스큐터 설치
    • SequentalExecutor 설정
      • 스케줄러의 태스크 오퍼레이터 부분은 단일 하위 프로세스에서 실행됨, 이 단일 하위 프로세스 내에서 작업은 순차적으로 하나씩 실행되므로 익스큐터 종류 중 가장 느린 실행 방법임
      • 구성 절차가 필요하지 않기 때문에 테스트 시점에 매우 편리하게 사용할 수 있음
    • LocalExecutor 설정
      • 아키텍처는 SequentialExecutor와 유사하지만, 여러 하위 프로세스가 있어 병렬로 태스크를 실행할 수 있으므로 SequentialExecutor에 비해 빠르게 수행할 수 있음
      • 각 하위 프로세스는 하나의 태스크를 실행할 수 있으며, 하위 프로세스는 병렬로 실행할 수 있음
      • 모든 구성 요소를 별도의 컴퓨터에서 실행할 수 있음, 스케줄러에 의해 생성된 하위 프로세스는 모두 하나의 단일 시스템에서 실행됨
    • CeleryExecutor 설정
      • Celery는 대기열 시스템을 통해 워커에게 메시지를 배포하기 위한 프레임워크를 제공함
      • 태스크가 Celery worker를 실행하는 여러 컴퓨터로 분배함, 워커는 태스크가 대기열에 도착할 때까지 기다림
      • Celery에서는 대기열을 브로커라함
      • Airflow webserver 실행
      • Airflow scheduler 실행
      • Airflow Celery worker 실행
    • KubernetesExecutor 설정
      • 모든 태스크가 쿠버네티스의 파드(pod)에서 실행됨
      • 쿠버네티스에서 웹 서버, 스케줄러 및 데이터베이스를 실행할 필요는 없지만, KubernetesExecutor를 사용할 때 쿠버네티스에서 다른 서비스들이 함께 실행되는 것이 관리하기 좀 더 수월함
      • 파드가 쿠버네티스에서 가장 작은 작업 단위이며 하나 이상의 컨테이너를 실행할 수 있다는 점
      • 다른 익스큐터는 작업중인 워커의 정확한 위치를 항상 알 수 있음, 쿠버네티스를 사용하면 모든 프로세스 파드에서 실행되며, 파드는 동일한 시스템에서 실행될 수도 있지만 여러 호스트에 분산되어 실행될 수 있음
      • 사용자의 관점에서 볼때 프로세스는 파드에서 실행되며 사용자는 실행하는 프로세스가 어떤 호스트에서 실행되는지 명확하게 바로 알수는 없음
      • Airflow 프로세스 간에 DAG파일을 배포하는 방법을 결정
        • PersistentVolume을 사용하여 포드 간에 DAG 공유
        • Git-sync init container를 사용해리포지토리의 최신 DAG 코드 가져오기
        • Docker 이미지에 DAG 빌드
  • 모든 Airflow 프로세스의 로그 확인
    • 웹 서버 로그 : 웹 활동에 대한 정보, 즉 웹 서버로 전송되는 요청에 대한 정볼르 보관함
    • 스케줄러 로그 : DAG 구문분석, 예약 작업 등을 포함한 모든 스케줄러 활동에 대한 정보를 보관함
    • 태스크 로그 : 각 로그 파일에는 단일 태스크 인스턴스의 로그가 보관됨
  • 원격 저장소로 로그 보내기
    • Airflow에는 로그를 원격 시스템으로 전송할 수 있는 원격 로깅이라는 기능 존재
      • AWS S3(pip install apache-airflow[amazon] 필요)
      • Azure Blob Storage(pip install apache-airflow[microsoft.azure] 필요)
      • Elasticsearch (pip install apache-airflow[elasticsearch] 필요)
      • Google Cloud Storage (pip install apache-airflow[google] 필요)
  • Airflow 메트릭 시각화 및 모니터링
    • Pushing vs Pulling
      • push 모델을 사용하면 메트릭이 메트릭 수집 시스템으로 전송되거나 밀어넣게됨
      • Pull 모델을 사용하면 메트릭은 특정 엔드포인트에서 모니터링하기 위해 시스템에 의해 노출되고, 메트릭 수집 시스템은 지정된 엔드포인트에서 모니터링하기 위해 시스템에서 메트릭을 가져오거나 가져오게해야 함
    • Prometheus는 메트릭을 수집하고 Grafana는 대시보드에서 메트릭을 시각화함
    • Prometheus StatsD 내보내기는 StatsD 측정 항목을 Prometheus의 측정 항목 형식으로 변환하고 Prometheus가 스크랩할 수 있도록 노출함
  • 메트릭을 수집하도록 Promethues 구성
    • Prometheus
      • 시스템 모니터링을 위한 소프트웨어
      • PromQL이라는 언어로 쿼리할 수 있는 시계열 데이터베이스의 역할을 수행함
  • 실패한 태스크에 대한 알림을 받는 방법
    • DAG 및 오퍼레이터(operator)의 정의 내에서 콜백(callback: 특정 이벤트를 호출하는 함수)을 구성할 수 있음
    • 서비스 수준 계약 정의(Service Level Agreement, SLA)
      • SLA의 일반적인 정의는 서비스 또는 제품에 대한 충족하는 특정 표준

Airflow 보안

  • Airflow 1.10.0에서 RBAC(role-based access control, 역할 기반 액세스 제어) 인터페이스를 처음 도입, RBAC 인터페이스는 해당 권한으로 역할을 정의하고 이러한 역할에 사용자를 할당하여 액세스를 제한하는 메커니즘을 제공함
  • Flask 기본 인터페이스는 더 이상 사용되지 않으며 Airlfow 2.0에서 제거되었음, RBAC 인터페이스는 이제 유일한 인터페이스
  • RBAC 모델은 웹서버 인터페이스의 특정 구성 요소에 대해 접근할 수 있는(특정 작업에 대한) 권한과 (단일) 역할을 가진 유저를 구성함
  • RBAC 인터페이스 설정
    • RBAC 인터페이스는 FAB(Flask-AppBuilder) 프레임워크를 기반으로 개발됨
    • $AIRFLOW_HOME에 web-server_config.py
    • Airflow RBAC 인터페이스의 기본 프레임워크인 FAB에 대한 구성이 포함되어 있음
  • LDAP 서비스로 연결
    • 디렉터리 서비스라고 하는 Azure AD 또는 OpenLDAP와 같은 LDAP 프로토콜(경량 디렉터리 액세스 프로토콜)을 지원하는 서비스를 사용하는 것

클라우드에서 Airflow

  • 관리형 Airflow 서비스
    • Astronomer.io
      • 쿠버네티스 기반의 솔루션, Airflow를 SaaS형 솔루션(Astronomer Enterprise), 바닐라 Airflow와 비교하면 Astronomer는 Airflow 인스턴스를 쉽게 배포하는 별도의 툴을 제공
      • UI와 별도로 만들어진 CLI로 제공함
      • CLI는 배포를 위해 Airflow의 로컬 인스턴스를 실행해 주고, 쉽게 DAG를 개발할 수 있게 함
    • 구글 Cloud Composer
      • 구글 클라우드 플랫폼의 최상단에서 구동되는 Airflow의 관리형 버전
      • one-click으로 GCP의 서비스들을 통합하여 Airflow를 배포할 수 있음
    • 아마존 Managed Workflows for Apache Airflow(MWAA)
      • AWS 클라우드에서 배포를 관리형으로 쉽게 생성해주는 AWS 서비스
      • AWS 서비스로는 S3, RedShift, SageMaker, 로깅과 알람을 위한 AWS CloudWatch,그리고 웹 인터페이스와 데이터에 대한 액세스 보안을 싱글로그인(single login)으로 제공하는 AWS IAM
      • 다른 관리형 솔루션과 유사하게 CeleryExecutor를 사용하여 현재 워크로드(workload)에 따라 워커를 스케일링(scaling)하며 사용자 대신 기반 인프라를 관리함

AWS에서의 Airflow

  • 클라우드 서비스 선택
    • Fargate
      • 컨테이너를 위한 AWS의 서버리스 컴퓨팅 엔진, 주요 장점(ECS나 EKS와 같은 AWS의 다른 서비스와 비교할 때 중 하나는 기반컴퓨팅 리소스들의 구축이나 관리 부담없이 AWS에서 쉽게 컨테이너를 실행할 수 있음
        • Elastic Compute Service : Fargate와 유사하지만 기반 머신들을 직접 관리해야함
        • Elastic Kubernetes Service : AWS의 관리형 솔루션으로 쿠버네티스를 배포하고 실행함
      • 단순히 웹 서버와 스케줄러 컨테이너 태스크를 Fargate에 정의하면 Fargate가 태스크의 배포, 실행, 모니터링을 관리함
      • 하드웨어 구축, 데이터베이스 설정, 패치 및 백업과 같이 시간이 많이 소요되는 관리작업을 처리하여, 클라우드의 관계형 데이터베이스를 구성하는 데 도움이 됨
    • S3는 확장 가능한 오브젝트 스토리지 시스템, 상대적으로 저렴한 비용으로 높은 내구성(durability)과 가용성(availability)을 제공하여 일반적으로 많은 데이터를 저장하는 데 매우 적합함
    • S3는 용량이 큰 데이터 세트(DAG에서 처리되는 데이터 세트)를 저장하거나 Airflow 워커 로그들(Airflow가 S3에 기본적으로 쓸 수 있음)과 같이 임시적인 파일들을 저장할때 유용함
    • 단점은 웹 서버나 스케줄러 시스템의 로컬 파일 시스템으로 마운트할 수 없다는 것, DAG와 같이 Airflow가 로컬 액세스가 필요한 파일들을 저장할 때 S3를 사용하는 것은 그다지 좋은 생각이 아님
    • EFS스토리지 시스템이 DAG들을 저장할 때 더 적합함, NFS와 호환되어서 Airflow 컨테이너에 직접 마운트할 수 있기 때문
    • EFS는 S3에 비해 상당히 비싸기 때문에 데이터나로그 파일을 저장할 때는 적합하지 않을 수 있음, S3보다 파일 업로드가 어려워 EFS에파일을 복사할 때 웹 페이지나 CLI 인터페이스 기능이 제공되지 않기 때문
  • 네트워크 설계
    • AWS 네트워킹을 구성할 때 퍼블릭과 프라이빗 서브넷을 모두 포함하는 VPC(Virtual Private Cloud)를 생성함
      • VPC안의 프라이빗 서브넷은 인터넷에 직접 노출되지 말아야 하는 서비스들을 위해 사용
      • 퍼블릭 서브넷은 서비스의 외부 접근과 인터넷으로 나가는 연결을 제공할 때 사용됨
    • 웹 서버와 스케줄러 컨테이너는 모두 DAG들을 가져오기 위해 Airflow 메타스토어 RDS와 EFS에 연결해야 함
  • DAG 동기화 추가
    • EFS의 단점은 웹 기반의 인터페이스나 커멘드 라인 툴(command line tool,CLT)을 사용할 수 없어서, DAG를 저장할 때 이 시스템에 접근하기 어렵다는 것
    • Git이나 S3에서 EFS로 DAG를 동기화하는 람다 함수를 생성하는 것
    • DAG들이 어떤 식으로는 변경되었을 때 트리거되어(S3이벤트나 Git의 경우 빌드 파이프라인에 의해), 변경 사항을 즉시 EFS에 동기화하고 Airflow에 적용시킬 수 있음
  • CeleryExecutor를 사용하여 스케일링
    • CeleryExecutor로 전환하면 Airflow 배포의 확장성을 더 향상시킬수 있음
    • Airflow 워커를 개별 컨테이너 인스턴스에서 실행하여, 각 워커가 사용할 수 있는 리소스들을 상당히 증가시킬 수 있음
    • Celery 기반으로 구성
      • 하나의 프로세스로 수행될 Airflow 워커들을 분리된 Fargate 태스크 풀에 넣어야 함
      • 스케줄러가 워커에 작업을 전달할 수 있는 메시지 브로커(message broker)를 추가해야함
      • Fargate 혹은 이와 유사한 서비스로 메시지 브로커(RabbitMQ 혹은 Redis)를 직접 구축할 수 있지만, AWS의 SQS서비스를 사용하는 것이 훨씬 더 쉬움, 간단한 서버리스 메시지 브로커 기능을 제공하는데 유지 보수 노력은 거의 필요 없음
      • 단점, LocalExecutor보다 구성이 좀 더 복잡해서, 구성할 때 더 많은 작업이 필요함, 각 워커들을 위한 컴퓨팅 리소스들을 필요하기 때문에 추가된 컴포넌트들(특히 부가적인 워커 태스크)은 상당한 비용을 추가시킬 수 있음
  • 추가 단계
    • 운영 배포에 있어 무엇보다도 중요한 고려 사항은 보안
    • 컴포넌트들에 대한 접근 제어와 AWS 리소스에 대한 접근 제어
    • 컴포넌트 접근 제어에는 보안 그룹과 네트워크 ACL를 사용할 수 있고, AWS 리소스에 대한 접근은 자격 및 접근 관리(identity and access management, IAM)의 역할과 정책을 적절히 부여하여 제한할 수 있음
    • 운영 환경에 배포할 때에는 로깅(logging), 감사(auditing),추적 메트릭스(tracking metrics) 그리고 배포된 서비스의 이슈 발생에 대한 알람 등에 대한 견고한 처리 방안이 필요함
    • CloudTrail과 CloudWatch를 포함하여 관련된 AWS 서비스들을 살펴보는 것이 좋음
  • AWS 전용 훅과 오퍼레이터
    • AWS 전용 훅의 요약
      • Athena - 서버리스 빅 데이터 쿼리 - AWSAthenaHook - 쿼리 실행, 쿼리 상태 모니터링, 결과 수집
      • CloudFormation - 인프라 리소스들(스택)관리 - AWSCloudFormation Hook - CloudFormation 스택의 생성과 삭제
      • EC2 - VM(가상 머신) - EC2Hook - VM의 세부 사항 수집; 상태가 변경될 때까지 대기함
      • Glue - 관리형 ETL 서비스 - AWSGlueJobHook - Glue 작업을 생성하고 상태를 체크함
      • Lambda - 서버리스 함수 - AWSLambdaHook - Lambda함수 호출
      • S3 - Simple Storage Service - S3Hook - 파일의 리스트 및 업로드 / 다운로드
      • SageMaker - 관리형 머신러닝 서비스 - SageMakerHook - 머신러닝 작업과 엔드포인트 등의 생성과 관리
    • AWS전용 오퍼레티어 요약
      • AWSAthenaOperator - Athena - Athena에서 쿼리 실행
      • CloudFormationCreateStackOperator - CloudFormation - CloudFormation ㅅ택 생성
      • CloudFormationDeleteStackOperator - CloudFormation - CloudFormation 스택 삭제스
      • S3CopyObjectOperator - s3 - S3에 있는 오브젝트 복사
      • SageMakerTrainingOperator - SageMaker - SageMaker 학습 작업 생성
    • AWSBaseHook는 특별히 언급할만한 훅으로서 AWS boto3 라이브러리를 사용하여 WS 서비스와 제네릭 인터페이스를 제공함
  • AWS Athena를 사용한 서버리스 영화 랭킹 구축
    • Glue는 S3에 저장된 데이터의 테이블 뷰를 생성하여, 영화 랭킹을 계산할 때 데이터를 쿼리할 수 있도록 함
    • AWS Athena(서버리스 SQL 쿼리 엔진)를 사용하여 평점 테이블에서 SQL 쿼리를 실행하고 영화랭킹을 계산함
    • S3및 Glue/Athena는 매우 확장성이 좋은 기술이고 서버리스로 구성하기 때문에 서버를 별도로 구축할 필요가 없고 한 달에 한 번 실행되는 프로세스에 대해 서버 비용을 사용할 때만 지불하여 비용을 절감할 수 있음
    • 재사용성을 위해 CloudFormation(코드로 클라우드 리소스를 정의하는 AWS 템플릿 솔루션)과 같은 코드형 인프라(infrastructure-as-code,IaC) 솔루션으로 리소스를 정의하고 관리하는 것이 좋음

Azure에서의 Airflow

  • 서비스 선택
    • Azure의 관리형 컨테이너 서비스를 사용, Azure Container Instances(ACI)나 Azure KubernetesService(AKS) 등, 웹 서버를 위해서는 Azure App Service와 같은 다른 서비스 옵션들도 있음
    • Azure App Service는 웹 애플리케이션의 빌드와 배포, 그리고 웹 애플리케이션의 스케일링에 대한 완전 관리형 플랫폼(fully managed platform), 인증과 모니터링 등의 기능을 포함하여, 웹 서비스를 관리형 플랫폼에 배포할 때 쉽고 편리한 방안을 제공함
    • 스케줄러를 배포할 때에는 기본적인 컨테이너 런타임을 제공하는 ACI를 사용하는 것이 더 합리적임
    • Airflow 메타스토어로는 Azure SQL 데이터베이스와 같은 Azure의 관리형 데이터베이스를 사용하는 것이 좋음
    • Azure File Storage, Azure Blob Storage, Azure Data Lake Storage등과 같이 여러 가지 스토리지 솔루션을 제공함
      • Azure File Storage는 DAG를 호스팅할 때 가장 편한 솔루션, 파일 스토리지 볼륨을 App Service와 ACI 컨테이너에 직접 마운트할 수 있음
      • 데이터 스토리지로는 Azure Blob Storage나 Azure Data Lake Storage가 더 적합함, Azure File Storage보다 데이터 워크로드를 처리하는 데 더 적합함
  • 실제 운영 환경에 적용하기에는 아직 부족함, 적절한 방화벽과 액세스 컨트롤 등과 같이 추가적인 작업이 더 필요함, Airflow 측면에서는 Airflow의 보안 방법을 더 고려해야함
  • Azure Synapse를 사용하여 서버리스 영화 랭킹 구축
    • Azure Synapse 사용, Azure의 SQL 온 디맨드 기능을 사용하여 서비리스로 SQL쿼리를 실행할 수 있는 기능을 제공함

GCP에서의 Airflow

  • 구글 클라우드 플랫폼은 훅과 오퍼레이터의 개수로 보면 Airflow를 적용할 때 사실상 최고의 클라우드 플랫폼, 거의 모든 구글 서비스를 Airlfow로 제어하는 것이 가능
  • 서비스 선택
    • 사용자가 원하는 어떤 소프트웨어도 실행할 수 있는 가상 머신을 제공하는 Compute Engine이 있음
    • Compute Engine은 사용자에게 완벽한 자유와 제어권을 제공함, 사용자가 직접 가상 머신을 설정하고 관리해야하는 단점도 존재
    • GCP에서 지원하는 언어 중 하나로 함수를 직접 제공할 수 있는 Cloud Functions이 있음
  • CeleryExecutor를 사용한 스케일링
    • 셀러리(celery)는 메시지 브로커를 사용하여 태스크를 여러 워커에게 분산시킴
    • GCP는 Pub/Sub라는 메시지 서비스를 제공하지만 셀러리가 이 서비스를 지원하지는 않음 , RabbitMQ나 Radis와 같이 셀러리가 지원하는 오픈 소스 툴을 사용해야 함
  • GCP에서 서버리스 영화 랭킹 구축
    • AWS Glue는 Apache Spark 서비스와 메타데이터 스토어 기능을 제공하는 관리형 서비스이고 GCP DataFlow는 관리형 Apache Beam 서비스
  • GCP에서 Airflow를 설치하고 실행하는 가장 쉬운 방법은 GKE에서 Airflow 헬름 차트를 사용하는 것