4 분 소요

Airflow에서 BashOperator 로 Cloud Function을 호출할 때, 저는 당연히 함수 내부에서 Exception이 발생하면 태스크도 실패할 것이라고 생각했습니다. 그런데 예상과 다르게, 함수는 터졌는데 Airflow는 아주 평온하게 SUCCESS 를 찍어주는 상황을 보게 됐습니다.

처음에는 BashOperator 가 실행만 시켜주고 종료 상태는 잘 모르는 것 아닌가 싶었습니다. 그런데 결론부터 말하면 그건 절반만 맞는 해석이었습니다. 정확히는, HTTP 방식으로 동작하는 Cloud Function을 Python 함수처럼 생각하고 호출했던 쪽이 더 큰 문제였습니다.

이 글은 그 삽질 기록입니다. 다행히 나중에는 제대로 실패시키는 데 성공했고, 그 과정에서 왜 이런 일이 생겼는지도 정리할 수 있었습니다.

문제 상황

Airflow에서는 아래처럼 curl 로 Cloud Function HTTP endpoint를 호출하고 있었습니다.

curl -m 3610 -X POST https://example-function-url \
  -H "Authorization: bearer $(gcloud auth print-identity-token)" \
  -H "Content-Type: application/json"

겉으로 보면 단순합니다. curl 이 HTTP 호출을 하고, 응답이 오면 끝입니다. 그런데 여기서 중요한 점이 하나 있습니다.

BashOperator 는 Python 함수의 예외를 직접 이해하지 못합니다. 결국 Bash 프로세스의 종료 코드(exit code) 만 봅니다.

즉, Cloud Function 안에서 Exception이 발생하더라도, 최종적으로 curl 명령이 0 으로 끝나면 Airflow는 성공으로 봅니다.

이걸 이해하고 나니, 그동안 왜 실패가 실패로 잡히지 않았는지가 명확해졌습니다.

case 1. 정상 종료

먼저 정상 동작부터 확인했습니다.

Cloud Function 코드는 아래처럼 단순했습니다.

import functions_framework
import time

@functions_framework.http
def main(request):
    print("### init")
    time.sleep(120)
    print("### finish function")
    return "success"

결과는 예상대로 성공이었습니다.

  • 함수 정상 종료
  • HTTP 응답 정상 반환
  • curl 종료 코드 0
  • Airflow는 SUCCESS

이건 당연한 흐름입니다.

case 2. Exception 발생

다음은 함수 내부에서 일부러 Exception을 발생시켰습니다.

import functions_framework
import time

@functions_framework.http
def main(request):
    print("### init")
    time.sleep(10)
    raise Exception("error!!!!!!!")
    return "success"

여기서 기대했던 것은 간단했습니다.

  • 함수에서 Exception 발생
  • Airflow 태스크 실패

그런데 실제 결과는 이랬습니다.

  • Cloud Function은 500 Internal Server Error
  • Airflow는 SUCCESS

이상해 보이지만, 로그를 보면 이유가 드러납니다.

500 Internal Server Error: The server encountered an internal error and was unable to complete your request.
Command exited with return code 0
Marking task as SUCCESS

핵심은 이 한 줄입니다.

Command exited with return code 0

즉, HTTP 응답은 500이었지만 curl 명령 자체는 정상적으로 “응답을 받았기 때문에” 성공으로 종료한 것입니다. Airflow는 HTTP 상태 코드를 해석하지 않고, Bash 명령의 종료 코드만 봤습니다.

여기서 깨달았습니다. 문제는 Cloud Function이 실패를 던졌는데도 Bash가 실패로 끝나지 않는 구조였다는 점입니다.

case 3. sys.exit(1)로 끝내기

그 다음에는 함수 내부에서 예외를 잡고 sys.exit() 로 종료해보았습니다.

import functions_framework
import time
import traceback
import sys

def process():
    print("### init")
    time.sleep(10)
    raise Exception("error!!!!!!!")

@functions_framework.http
def main(request):
    try:
        return process()
    except Exception as e:
        print("[error]", traceback.format_exc())
        sys.exit(str(e))

결과는 오히려 더 이상했습니다.

  • 태스크가 실패하지도 않고
  • 깔끔하게 끝나지도 않고
  • 사실상 응답이 꼬이면서 종료가 비정상적으로 흘렀습니다

이건 지금 보면 당연한 결과였습니다. HTTP Cloud Function은 웹 서버처럼 요청-응답 사이클 안에서 동작합니다. 그런데 그 안에서 sys.exit() 로 프로세스를 죽이는 방식은, HTTP 응답을 정상적으로 마무리하는 모델과 잘 맞지 않습니다.

즉, 이건 “파이썬 프로그램 종료” 관점의 해결책이지, “HTTP 요청 처리 실패” 관점의 해결책은 아니었습니다.

case 4. HTTP 상태 코드를 받아서 Bash에서 실패 처리

결국 답은 Cloud Function 안이 아니라 Bash 쪽에 있었습니다.

함수는 HTTP로 동작합니다. 그렇다면 함수의 성공/실패는 HTTP 응답 코드로 판단하고, Bash에서 그 값을 보고 명시적으로 exit 1 해줘야 합니다.

실제로 사용한 스크립트는 아래와 같습니다.

response=$(
  curl -s -o /dev/null -w "%{http_code}" -m 3610 -X POST https://example-function-url \
    -H "Authorization: bearer $(gcloud auth print-identity-token)" \
    -H "Content-Type: application/json"
)

echo "$response"

if [ "$response" -eq 500 ]; then
  echo "Error: Response status is 500"
  exit 1
else
  echo "Success: Response status is $response"
fi

이제 흐름은 명확해집니다.

  1. curl 이 응답 본문 대신 상태 코드만 받습니다.
  2. Bash에서 상태 코드를 검사합니다.
  3. 500 이면 명시적으로 exit 1 합니다.
  4. 그제야 Airflow가 실패로 인식합니다.

이 케이스에서는 드디어 기대했던 결과가 나왔습니다.

  • Cloud Function Exception 발생
  • HTTP 500 반환
  • Bash에서 exit 1
  • Airflow 태스크 FAILED

드디어 실패해야 할 것이 실패했습니다. 운영자 입장에서는 참 감격스러운 순간이었습니다.

왜 이런 일이 생겼는가

이 문제는 BashOperator 의 한계라기보다, 제가 호출 모델을 잘못 이해했던 쪽에 더 가까웠습니다.

정리하면 이렇습니다.

  • BashOperator 는 Bash 명령의 종료 코드만 본다
  • curl 은 HTTP 500을 받아도 종료 코드 0일 수 있다
  • Cloud Function은 HTTP 서버처럼 동작한다
  • 따라서 함수 내부 예외만으로는 Airflow 실패를 보장할 수 없다
  • Bash에서 HTTP 상태 코드를 보고 직접 exit 1 해야 한다

즉, Python 함수 호출처럼 생각하면 안 되고, HTTP API 호출처럼 다뤄야 했습니다.

데이터 엔지니어 관점에서의 교훈

이 경험은 단순히 Airflow와 Cloud Function의 궁합 문제로 끝나지 않았습니다. 운영 파이프라인에서 실패 감지를 어디서 할 것인가에 대한 중요한 교훈을 줬습니다.

1. 실패는 “발생”보다 “전파”가 중요합니다

함수 안에서 에러가 나는 것만으로는 부족합니다. 그 실패가 오케스트레이터까지 정확히 전달되어야 의미가 있습니다. 운영 자동화에서는 실패의 발생보다, 실패 신호의 전파가 더 중요할 때가 많습니다.

2. HTTP 호출은 HTTP처럼 다뤄야 합니다

내부적으로 Python으로 짜여 있어도, 밖에서 보면 그건 HTTP endpoint입니다. 그러면 성공/실패 판단도 HTTP status code 기준으로 가져가는 것이 맞습니다.

3. 오케스트레이터는 도메인 로직을 추론하지 않습니다

Airflow는 “이 함수가 내부에서 어떤 예외를 던졌는가”를 이해하지 않습니다. 오케스트레이터는 보통 종료 코드, 상태 코드, 이벤트, 메트릭 같은 표준화된 신호를 봅니다. 따라서 실패 감지 역시 그런 신호로 맞춰줘야 합니다.

이후라면 어떻게 할까

지금 다시 만든다면 아래 중 하나를 택할 것 같습니다.

  • curl --fail 또는 --fail-with-body 활용
  • HTTP status code 검사 후 명시적으로 exit 1
  • 아예 PythonOperator 안에서 requests 로 호출하고 raise_for_status() 처리

특히 BashOperator를 계속 쓴다면, 단순 curl 호출만 두는 방식은 피하고 실패 조건을 명시하는 편이 훨씬 안전합니다.

예를 들면 아래처럼 조금 더 짧게도 만들 수 있습니다.

curl --fail -m 3610 -X POST https://example-function-url \
  -H "Authorization: bearer $(gcloud auth print-identity-token)" \
  -H "Content-Type: application/json"

다만 응답 본문까지 보고 세밀하게 제어해야 한다면, 상태 코드를 직접 파싱하는 현재 방식이 더 명확할 수 있습니다.

마무리

이번 경험은 “에러를 던졌는데 왜 성공이지?”라는 아주 단순한 의문에서 시작했습니다. 하지만 끝까지 따라가 보니, 문제는 Airflow도 Cloud Function도 아니었습니다. HTTP 호출을 HTTP답게 다루지 않았던 쪽이 핵심이었습니다.

운영 파이프라인에서는 이런 종류의 오해가 꽤 자주 생깁니다. 내부 구현 언어보다 외부 인터페이스가 더 중요할 때가 많기 때문입니다.

그래도 하나 얻은 것은 분명했습니다. 실패해야 할 때 제대로 실패하는 파이프라인은 생각보다 소중합니다. 그리고 가끔은 그 실패 로그 하나가 꽤 반갑습니다.

댓글 남기기

스팸 방지를 위해 짧은 시간에 반복 등록은 제한됩니다.

댓글 목록

관리자 보기