python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON

标签 python airflow airflow-2.x

大家好,我的 dag 实际上运行良好,所有输出都在工作,但由于以下问题,airflow 的 UI 不会更改为成功和失败。在线阅读,我遇到了这两个:

do_xcom_push=False

并且 Xcom_push 将在 Airflow 2.0 版中弃用。

我只是不确定如何实际设置它? 谁能分享任何见解?

完整代码:

import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from google.cloud import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
"""
gas_data = (
    bqclient.query(query_string)
        .result()
        .to_dataframe(
        create_bqstorage_client=True,
    ))

manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}

meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
                'G4F9': {'': 'G4SZV-1'},
                'G4F1': {'': 'G4SDZV-2'},
                'G4K0': {'': 'BK-G4E'},
                'E6S1': {'': 'E6VG470'},
                'E6S2': {'': 'E6VG470'},
                }

def map_manufacturer_model(s):
    s = str(s)
    model = ''
    try:
        manufacturer = manufacturers[s[:4]]
        for k, m in meter_models[s[:4]].items():
            if s[-4:].startswith(k):
                model = m
                break
    except KeyError:
        manufacturer = ''

    return pd.Series({'NewMeterManufacturer': manufacturer,
                      'NewMeterModel': model
                      })


gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[],
    write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config)  # Make an API request.
job.result()  # Wait for the job to complete.
table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
    table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')

default_dag_args = {'owner': 'ME',
                    'start_date': datetime.datetime(2021, 12, 16),
                    }

with models.DAG('Test_Dag_V1',
                schedule_interval=None, #'45 10 * * *',
                default_args=default_dag_args) as dag:
    format_function = python_operator.PythonOperator(
        task_id='format_function',
        python_callable=format_data
    ),

    map_manufacturer_model_function = python_operator.PythonOperator(
        task_id='map_manufacturer_model_function',
        python_callable=map_manufacturer_model,
        op_kwargs={'s': 'stringValue'}
    )


format_function >> map_manufacturer_model_function

Airflow 错误

 [2021-12-15 16:44:26,180] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
    [2021-12-15 16:44:26,182] {taskinstance.py:1465} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1318, in _execute_task
        self.xcom_push(key=XCOM_RETURN_KEY, value=result)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
        return func(*args, session=session, **kwargs)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1905, in xcom_push
        XCom.set(
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
        return func(*args, **kwargs)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
        value = XCom.serialize_value(value)
      File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
        return json.dumps(value).encode('UTF-8')
      File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
        return _default_encoder.encode(obj)
      File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type Series is not JSON serializable

最佳答案

在您的 airflow.cfg 文件中,enable_xcom_pickling

关于python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70367529/

相关文章:

python - 我需要一些帮助来使用 HTML 中的 CSS 或 Xpath 识别按钮

python - feedparser 出现 UndeclaredNamespace 错误

airflow-scheduler - Airflow - 如何仅一次 'Filling up the DagBag'

airflow - 在单次 dag 运行期间读取 dag 定义文件的频率是多少(每次任务运行/触发时都会重新评估/重新计算 dag)?

airflow - Airflow 或任何其他选项中的任务之间存在延迟?

python - 变形金刚 BartTokenizer::add_tokens() 不能像我期望的那样工作后缀

python - CLI 钢琴合成器?

python - 来自 Airflow 数据库 Hook 的 SQLAlchemy 引擎

Airflow - 并行执行 X 个动态任务,最多 4 个任务

airflow - 在 airflow 2.0 taskflow API 中定义复杂的工作流依赖