5 분 소요

Airflow를 운영하다 보면 생각보다 자주 드는 아쉬움이 있습니다. “이 태스크는 사실 하는 일이 거의 없는데 왜 워커를 계속 잡아먹고 있지?” 같은 종류의 아쉬움입니다.

특히 센서나 외부 작업 완료 대기처럼, 실제 계산은 거의 하지 않는데 상태만 기다리는 작업이 그렇습니다. 이런 태스크가 많아지면 Airflow는 오케스트레이터라기보다 대기실 운영 시스템처럼 느껴질 때가 있습니다. 비싸고 바쁜 대기실입니다.

이럴 때 다시 보게 되는 기능이 Deferrable Operators 입니다. 이번 글은 Airflow의 deferrable 개념이 왜 필요한지, mode="reschedule" 과는 무엇이 다른지, 그리고 MWAA 환경에서 왜 특히 의미가 큰지 정리한 기록입니다.

왜 이 기능이 필요했는가

현재 저희 회사에서 운영 중인 아키텍처를 보면, Airflow 워커가 모든 일을 직접 처리하는 구조는 아닙니다. 대부분의 실제 계산 작업은 AWS Batch 를 통해 ECS Task 를 띄워 처리하고, Airflow는 그 작업을 스케줄링하고 결과를 이어주는 오케스트레이터 역할에 더 가깝습니다.

구성도를 먼저 보면 전체 구조는 아래와 같습니다.

graph LR
    S["Airflow Scheduler"] --> W["MWAA Worker"]
    W --> B["AWS Batch"]
    B --> E["ECS Task"]
    E --> R["외부 작업 실행"]

이 구조에서는 워커가 오래 붙잡혀 있을 이유가 많지 않습니다. 실제 무거운 일은 바깥에서 하고 있기 때문입니다. 그런데도 센서나 대기형 태스크 때문에 워커 슬롯이 계속 점유되면, 오케스트레이터가 괜히 병목이 됩니다.

즉 문제는 계산 리소스가 아니라 대기 리소스 였습니다.

mode="reschedule" 이 있지 않나

맞습니다. Airflow에는 예전부터 센서에서 mode="reschedule" 을 써서 워커 점유를 줄이는 방식이 있었습니다. 저도 처음에는 이걸로 충분하지 않나 생각했습니다.

그런데 공식 문서에서 말하는 핵심 차이를 보면, rescheduledeferrable=True 는 비슷해 보여도 운영상 성격이 꽤 다릅니다.

흐름을 보면 아래와 같습니다.

graph TD
    A["Sensor 시작"] --> B{"조건 충족?"}
    B -->|"아니오"| C["mode=reschedule: 다시 스케줄"]
    C --> D["다음 실행 때 워커 재할당"]
    B -->|"예"| E["후속 작업 진행"]

reschedule 은 말 그대로 “지금은 아니니 나중에 다시 잡아줘”에 가깝습니다. 워커를 잠깐 반납하는 효과는 있지만, 다음 재시도 때마다 다시 스케줄링과 워커 할당 흐름을 탑니다.

반면 deferrable은 구조 자체가 다릅니다.

graph TD
    A["Operator 실행"] --> B{"지금 바로 끝낼 수 있나?"}
    B -->|"아니오"| C["Triggerer로 defer"]
    C --> D["비동기 trigger가 조건 확인"]
    D -->|"조건 충족"| E["Task 재개"]
    E --> F["execute_complete 실행"]
    B -->|"예"| F

핵심은 기다리는 동안 워커가 아니라 Triggerer 가 상태를 감시한다는 점입니다. 그래서 대기성 작업이 많을수록 차이가 커집니다.

MWAA에서는 왜 더 중요할까

MWAA 환경에서는 워커를 괜히 낭비하는 일이 더 아깝게 느껴집니다.

이유는 단순합니다.

  • 실제 계산은 이미 AWS BatchECS Task 에서 처리합니다
  • Airflow 워커는 스케줄링과 orchestration 에 집중하는 편이 낫습니다
  • 대기성 태스크가 많아질수록 워커 수를 늘리고 싶은 유혹이 생깁니다
  • 그런데 그 증설 이유가 “계산”이 아니라 “기다림”이면 꽤 억울합니다

운영 관점에서는 이게 중요합니다. 워커는 비싼 CPU를 태우고 있는데 실제로는 “아직 안 끝났나?”를 반복해서 확인하는 데만 쓰일 수 있습니다. 이건 조금 고급스러운 타이머에 가깝습니다.

그래서 MWAA 같은 환경에서는 deferrable이 단순한 기능 추가가 아니라, 워커를 계산용이 아니라 조정용으로 유지하기 위한 구조적 선택지 에 가깝습니다.

직접 테스트해본 구현 방식

이번에 보고 싶었던 방향은 특정 provider 센서를 그대로 쓰는 것보다, deferrable 구조를 이해하기 위한 테스트용 구현을 직접 만들어보는 것이었습니다.

의도는 단순했습니다.

  1. 먼저 지금 이 센서가 정말 대기 상태로 들어가야 하는지 확인합니다
  2. 대기가 필요하면 trigger로 넘깁니다
  3. trigger가 비동기로 외부 상태를 주기적으로 확인합니다
  4. 조건이 만족되면 다시 task를 깨웁니다
  5. 마지막 실행 함수만 태웁니다

구성도를 보면 아래와 같습니다.

graph LR
    P["pre_check_func"] --> D{"대기 필요?"}
    D -->|"아니오"| X["즉시 종료"]
    D -->|"예"| T["GenericTrigger"]
    T --> C["check_func 주기 확인"]
    C -->|"ready"| R["TriggerEvent"]
    R --> E["execute_complete"]
    E --> F["execute_func"]

핵심은 대기 필요 여부와 실제 대기 로직, 완료 후 실행 로직을 분리해서 deferrable 동작 방식을 확인해보는 것이었습니다.

코드 구조는 크게 세 조각입니다

먼저 Trigger는 비동기로 외부 조건을 확인하다가 만족되면 TriggerEvent 를 발생시킵니다.

from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio
from typing import Callable


class GenericTrigger(BaseTrigger):
    def __init__(self, check_func: Callable[[], asyncio.Future], check_interval: int):
        super().__init__()
        self.check_func = check_func
        self.check_interval = check_interval

    def serialize(self):
        return (
            "GenericTrigger",
            {
                "check_func": self.check_func,
                "check_interval": self.check_interval,
            },
        )

    async def run(self):
        while True:
            if await self.check_func():
                yield TriggerEvent({"status": "ready"})
                return
            await asyncio.sleep(self.check_interval)

Operator 쪽에서는 먼저 사전 조건을 보고, 정말 대기가 필요할 때만 defer() 로 넘깁니다.

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
from typing import Callable, Any
import asyncio


class GenericDeferrableSensor(BaseSensorOperator):
    def __init__(
        self,
        pre_check_func: Callable[[Context], bool],
        check_func: Callable[[], asyncio.Future],
        check_interval: int,
        execute_func: Callable[[Context], Any],
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.pre_check_func = pre_check_func
        self.check_func = check_func
        self.check_interval = check_interval
        self.execute_func = execute_func

    def execute(self, context: Context) -> None:
        if not self.pre_check_func(context):
            self.log.info("Pre-check condition not met, skipping or marking as success.")
            return

        self.defer(
            trigger=GenericTrigger(self.check_func, self.check_interval),
            method_name="execute_complete",
        )

    def execute_complete(
        self,
        context: Context,
        event: dict[str, Any] | None = None,
    ) -> None:
        self.execute_func(context)

활용 예제는 아래와 같습니다.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


def pre_check_func(context: Context) -> bool:
    today_week_day = datetime.today().weekday()
    return today_week_day == 0


async def check_func() -> bool:
    import aiohttp

    async with aiohttp.ClientSession() as session:
        async with session.get("http://example.com/api/status") as response:
            if response.status == 200:
                data = await response.json()
                return "ready" in data
    return False


def execute_func(context: Context) -> None:
    print("Conditions met, executing final task")


with DAG(
    "generic_deferrable_sensor_example",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    catchup=False,
) as dag:
    GenericDeferrableSensor(
        task_id="generic_deferrable_sensor",
        pre_check_func=pre_check_func,
        check_func=check_func,
        check_interval=60,
        execute_func=execute_func,
        execution_timeout=timedelta(hours=1),
    )

그런데 늘 그렇듯, 잘 될 것 같을 때 다른 문제가 나옵니다

실제로는 꽤 잘 될 것 같았습니다. 잠깐 뜨는 것처럼 보이기도 했습니다. 그래서 더 아쉬웠습니다. “이 정도면 거의 됐는데 왜 안 되지” 구간이 제일 사람을 괴롭힙니다.

문제는 trigger 직렬화(serialize) 쪽이었습니다. module path 를 제대로 찾지 못하면서 오류가 발생했습니다.

이 부분이 중요한 이유는 deferrable trigger가 단순한 콜백이 아니기 때문입니다. Triggerer 프로세스에서 다시 복원할 수 있도록, trigger는 자신이 어떤 클래스이고 어떤 인자로 다시 만들어져야 하는지 직렬화 가능한 형태로 제공해야 합니다.

즉 “함수를 들고 있으면 되겠지”가 잘 안 통합니다. 특히 임의의 callable 을 그대로 들고 serialize 하려는 접근은 Airflow의 trigger 복원 모델과 충돌하기 쉽습니다.

여기서 배운 점은 분명했습니다.

  • trigger는 재구성 가능한 값만 가져가는 편이 안전합니다
  • import 가능한 모듈 경로와 클래스 경로가 명확해야 합니다
  • callable 자체를 넘기기보다, 경로 문자열이나 설정값을 넘겨 런타임에 resolve 하는 방식이 더 현실적입니다

즉 generic하게 만들고 싶어도, Airflow trigger는 생각보다 “아무 함수나 받아서 돌리는 범용 런타임”처럼 설계되어 있지 않습니다.

그래서 실무에서는 어떻게 가져가는 편이 나을까

제 생각에는 실무적으로는 두 가지 방향이 현실적입니다.

첫째, 완전 범용 Generic Trigger를 만들기보다 도메인별 deferrable 센서 로 나누는 편이 낫습니다.

예를 들면 이런 식입니다.

  • Batch Job 완료 대기 센서
  • ECS Task 상태 확인 센서
  • 외부 API ready 상태 확인 센서
  • 파일 도착 대기 센서

각 센서가 필요한 최소 설정만 받고, trigger가 직렬화 가능한 파라미터만 사용하도록 만드는 편이 운영은 더 단단합니다.

둘째, generic하게 가고 싶다면 callable 을 들고 다니지 말고 아래처럼 가져가는 편이 안전합니다.

  • 대상 resource 식별자
  • polling interval
  • provider type
  • 상태 확인에 필요한 고정 설정

그리고 실제 상태 확인 로직은 trigger 내부에서 provider type 기준으로 분기하는 쪽이 낫습니다.

데이터 엔지니어 관점에서의 의미

저는 deferrable operator를 단순히 “센서를 예쁘게 만드는 기능”으로 보지는 않습니다. 이건 오케스트레이터가 어디까지 워커를 써야 하는지에 대한 아키텍처 선택에 더 가깝다고 봅니다.

특히 외부 실행 엔진이 따로 있는 환경에서는 더 그렇습니다.

  • Airflow는 스케줄링과 흐름 제어를 담당합니다
  • 실제 계산은 Batch, ECS, Spark, 외부 API 등 바깥에서 수행됩니다
  • 그렇다면 기다림까지 워커가 떠안을 이유는 점점 줄어듭니다

이 관점에서 보면 deferrable은 단순 최적화가 아니라, 오케스트레이터를 오케스트레이터답게 유지하는 기능 입니다.

정리

Airflow에서 대기성 작업이 많고, 특히 MWAA 워커를 최대한 아끼고 싶다면 deferrable operator는 한 번쯤 진지하게 볼 가치가 있습니다.

mode="reschedule" 도 도움이 되지만, 기다리는 동안 triggerer로 책임을 넘길 수 있다는 점에서 deferrable은 한 단계 더 구조적인 해법입니다.

다만 직접 구현할 때는 trigger serialize 모델을 가볍게 보면 안 됩니다. 잘 될 것 같은 순간에 가장 잘 막히는 지점이기도 합니다.

그래도 방향은 분명하다고 생각합니다. 실제 계산은 AWS BatchECS 에 맡기고, Airflow 워커는 스케줄링과 재개 시점 제어에 더 집중하게 만드는 것. 그 흐름을 더 자연스럽게 만드는 도구 중 하나가 deferrable operator입니다.

댓글 남기기

스팸 방지를 위해 짧은 시간에 반복 등록은 제한됩니다.

댓글 목록

관리자 보기