python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON

大家好,我的 dag 实际上运行良好,所有输出都在工作,但由于以下问题,airflow 的 UI 不会更改为成功和失败。在线阅读,我遇到了这两个:


并且 Xcom_push 将在 Airflow 2.0 版中弃用。

我只是不确定如何实际设置它? 谁能分享任何见解?


import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
gas_data = (

manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}

meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
                'G4F9': {'': 'G4SZV-1'},
                'G4F1': {'': 'G4SDZV-2'},
                'G4K0': {'': 'BK-G4E'},
                'E6S1': {'': 'E6VG470'},
                'E6S2': {'': 'E6VG470'},

def map_manufacturer_model(s):
    s = str(s)
    model = ''
        manufacturer = manufacturers[s[:4]]
        for k, m in meter_models[s[:4]].items():
            if s[-4:].startswith(k):
                model = m
    except KeyError:
        manufacturer = ''

    return pd.Series({'NewMeterManufacturer': manufacturer,
                      'NewMeterModel': model

gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config)  # Make an API request.
job.result()  # Wait for the job to complete.
table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
    table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')

default_dag_args = {'owner': 'ME',
                    'start_date': datetime.datetime(2021, 12, 16),

with models.DAG('Test_Dag_V1',
                schedule_interval=None, #'45 10 * * *',
                default_args=default_dag_args) as dag:
    format_function = python_operator.PythonOperator(

    map_manufacturer_model_function = python_operator.PythonOperator(
        op_kwargs={'s': 'stringValue'}

format_function >> map_manufacturer_model_function

Airflow 错误

 [2021-12-15 16:44:26,180] {} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
    [2021-12-15 16:44:26,182] {} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/", line 1166, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/", line 1285, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/", line 1318, in _execute_task
        self.xcom_push(key=XCOM_RETURN_KEY, value=result)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/", line 70, in wrapper
        return func(*args, session=session, **kwargs)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/", line 1905, in xcom_push
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/", line 67, in wrapper
        return func(*args, **kwargs)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/", line 79, in set
        value = XCom.serialize_value(value)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/", line 226, in serialize_value
        return json.dumps(value).encode('UTF-8')
      File "/opt/python3.8/lib/python3.8/json/", line 231, in dumps
        return _default_encoder.encode(obj)
      File "/opt/python3.8/lib/python3.8/json/", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/opt/python3.8/lib/python3.8/json/", line 257, in iterencode
        return _iterencode(o, 0)
      File "/opt/python3.8/lib/python3.8/json/", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type Series is not JSON serializable


在您的 airflow.cfg 文件中,enable_xcom_pickling

