Airflow Xcom Exclusive Page
XCom does not natively support "pop" or "consume once". You must implement it manually:
# Save this file in your airflow plugins/ directory as custom_backend.py import json import uuid from typing import Any from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook class S3XComBackend(BaseXCom): PREFIX = "s3://" BUCKET_NAME = "my-exclusive-airflow-xcom-bucket" @staticmethod def serialize(value: Any, **kwargs) -> str: """Serializes the object, saves to S3, and returns the S3 URI.""" s3_hook = S3Hook(aws_conn_id='aws_default') key = f"xcom/uuid.uuid4().json" # Convert object to string/json string_data = json.dumps(value) # Load string into S3 s3_hook.load_string( string_data=string_data, key=key, bucket_name=S3XComBackend.BUCKET_NAME, replace=True ) # This string URI is what actually saves to the Airflow DB return f"S3XComBackend.PREFIXS3XComBackend.BUCKET_NAME/key" @staticmethod def deserialize(result, **kwargs) -> Any: """Reads the S3 URI from the database and pulls the real data from S3.""" s3_hook = S3Hook(aws_conn_id='aws_default') s3_uri = result.get_value() # Strip the prefix to get the bucket and key path path = s3_uri.replace(S3XComBackend.PREFIX, "") bucket, key = path.split("/", 1) # Read from S3 file_content = s3_hook.read_key(key=key, bucket_name=bucket) return json.loads(file_content) Use code with caution. airflow xcom exclusive
Most operators automatically push their execution result to this "reserved" key if do_xcom_push is enabled. Why "Exclusive" XComs Matter XCom does not natively support "pop" or "consume once"
Airflow does not automatically delete XCom entries when a DAG run finishes. Over months, your xcom table will grow to millions of rows, slowing down database queries. Why "Exclusive" XComs Matter Airflow does not automatically
from airflow.decorators import dag, task from datetime import datetime @dag(start_date=datetime(2026, 1, 1), schedule=None, catchup=False) def xcom_exclusive_demo(): @task def push_task(): # This return value is automatically pushed to XCom return "status": "success", "file_id": 12345 @task def pull_task(value_from_push): # The TaskFlow API passes the XCom value directly print(f"Received: value_from_push['file_id']") # Directly connecting tasks data = push_task() pull_task(data) xcom_exclusive_demo() Use code with caution. 2. Traditional xcom_push and xcom_pull