python - Airflow :XCOM 插入期间的 Pickle 深度递归

标签 python recursion pickle airflow

您好,我正在使用 PythonOperator 运行任务。看来该任务实际上能够正常运行,并且返回的值正是我所期望的(它是 API 调用的大型 XML 输出)。但是,我收到一个错误 - (builtins.RecursionError) 比较中超出了最大递归深度。我的 python 可调用函数正在返回一个值,因此我假设存在 XCOM 推送,并且它正在尝试序列化输出以供后续操作符摄取。但我不确定具体如何解决,因为我没有看到配置 a) 增加 Pickle 序列化器的递归深度(建议 here )或 2) XCOM 推送期间的任何错误处理

我的完整轨迹如下

INFO - Subtask: [2017-11-08 14:00:14,545] {models.py:1342} INFO - Executing <Task(PythonOperator): test_task_xml> on 2017-11-07 00:00:00
INFO - Subtask: [2017-11-08 14:00:31,817] {python_operator.py:81} INFO - Done. Returned value was: <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>
INFO - Subtask: [2017-11-08 14:00:31,839] {models.py:1417} ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison [SQL: 'INSERT INTO xcom (key, value, timestamp, execution_date, task_id, dag_id) VALUES (%(key)s, %(value)s, now(), %(execution_date)s, %(task_id)s, %(dag_id)s) RETURNING xcom.id'] [parameters: [{'dag_id': 'test_dag', 'key': 'return_value', 'value': <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>, 'task_id': 'test_task_xml', 'execution_date': datetime.datetime(2017, 11, 7, 0, 0)}]]
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1116, in _execute_context
INFO - Subtask:     context = constructor(dialect, self, conn, *args)
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in _init_compiled
INFO - Subtask:     for key in compiled_params
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in <genexpr>
INFO - Subtask:     for key in compiled_params
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 1516, in process
INFO - Subtask:     value = dumps(value, protocol)
INFO - Subtask:     dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 274, in dump
[INFO - Subtask:     pik.dump(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 409, in dump
INFO - Subtask:     self.save(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask:     self.save_reduce(obj=obj, *rv)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask:     save(state)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
INFO - Subtask:     StockPickler.save_dict(pickler, obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 821, in save_dict
INFO - Subtask:     self._batch_setitems(obj.items())
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 847, in _batch_setitems
INFO - Subtask:     save(v)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 781, in save_list
INFO - Subtask:     self._batch_appends(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 805, in _batch_appends
INFO - Subtask:     save(x)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask:     self.save_reduce(obj=obj, *rv)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask:     save(state)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict

最佳答案

发现数据库的 BLOBBINARY LARGE OBJECT 设置的 pickle Python 对象存在限制。要解决这个问题,您可以

  • 尝试使用 Fileflow
  • 将文件转储到临时文件夹中并通过 XCOM 功能推送文件路径
  • 在单个任务中处理整个流程并通过 XCOM 推送某些值

关于python - Airflow :XCOM 插入期间的 Pickle 深度递归,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47188305/

相关文章:

python - 如何增加 Python 中的最大递归深度?

recursion - 从递归函数的基本情况返回语句 (Python)

algorithm - Kakuro 和 Subset Sum,其中超集包含连续的正整数,子集的大小固定为 k

python - 如何使用 dill 序列化类定义?

python - 如何在 itertools.cycle() 中进行洗牌?

python - 以最快的方式将 one-hot 编码的特征保存到 Pandas DataFrame 中

python - c pickle : UnpicklingError: invalid load key, 'A'

python - 如何正确 pickle 命名元组实例

Python - 为什么我不能在 print 函数中使用生成器?

python - 在 Pandas 中折叠列和索引