python - Dagster 无法本地连接到 mongodb

标签 python mongodb etl dagster

我正在阅读 Dagster 教程,并认为这是连接到本地 mongodb 的一个很好的练习。

from dagster import get_dagster_logger, job, op
from pymongo import MongoClient

@op
def connection():
    client = MongoClient("mongodb://localhost:27017/")
    return client["development"]

@job
def execute():
    client = connection()
    get_dagster_logger().info(f"Connection: {client} ")

达格斯特错误:

dagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "connection":
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 348, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 405, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output, input_lineage):
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 534, in _store_output
    for elt in iterate_with_context(
  File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 400, in iterate_with_context
    return
  File "/usr/local/Cellar/<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="89f9f0fde1e6e7c9baa7b0" rel="noreferrer noopener nofollow">[email protected]</a>/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
TypeError: cannot pickle '_thread.lock' object
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 398, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 524, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/usr/local/lib/python3.9/site-packages/dagster/core/storage/fs_io_manager.py", line 124, in handle_output
    pickle.dump(obj, write_obj, PICKLE_PROTOCOL)

我已经在 ipython 中进行了本地测试,它可以工作,因此问题与 dagster 有关。

最佳答案

默认IOManager要求操作的输入和输出是可pickle的——你的MongoClient很可能不是。您可能想尝试重构它以使用 Dagster 的 @resource方法。这允许您在 @op 外部定义资源,并使稍后在测试中模拟这些资源变得非常容易。您的代码将如下所示:

from dagster import get_dagster_logger, job, op, resource
from pymongo import MongoClient

@resource
def mongo_client():
    client = MongoClient("mongodb://localhost:27017/")
    return client["development"]

@op(
    required_resource_keys={'mongo_client'}
)
def test_client(context):
    client = context.resources.mongo_client
    get_dagster_logger().info(f"Connection: {client} ")

@job(
    resource_defs={'mongo_client': mongo_client}
)
def execute():
    test_client()
    

还请注意,我将测试代码移动到另一个@op 中,然后仅从执行@job 中调用该操作。这是因为作业定义中的代码在加载时进行编译,并且仅用于描述要执行的操作图。所有执行任务的通用编程都需要包含在 @op 代码中。

@resource 模式的真正巧妙之处在于,这使得使用模拟资源或更普遍的交换资源进行测试变得非常容易。假设您想要一个模拟客户端,这样您就可以运行作业代码而无需实际访问数据库。您可以执行如下操作:

@resource
def mocked_mongo_client():
    from unittest.mock import MagicMock
    return MagicMock()

@graph
def execute_graph():
    test_client()


execute_live = execute_graph.to_job(name='execute_live',
                                    resource_defs={'mongo_client': mongo_client,})
execute_mocked = execute_graph.to_job(name='execute_mocked',
                                      resource_defs={'mongo_client': mocked_mongo_client,})

这使用 Dagster 的 @graph模式来描述操作的 DAG,然后使用 GraphDefinition 对象上的 .to_job() 方法以不同的方式配置图形。这样你就可以拥有完全相同的底层操作结构,但传递不同的资源、标签、执行器等。

关于python - Dagster 无法本地连接到 mongodb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72118156/

相关文章:

Python 模型对象验证

python - 解析文本数据并插入MySQL

java - mongodb 3.2 优化管道查询

mongodb - 如何在 MongoDB 2.6 副本集上启用 HTTP 控制台

sql-server - For Each 循环容器中各种文件的行数

sql - 在 SSIS csv 导出中去除日期/时间分隔符

python - 如何使用youtube-dl搜索,以便可以从.txt文件中提取列表?

python 正则表达式从字符串中查找子字符串

mongodb - 使用 -d 标志时 mongorestore 期望什么样的文件或目录?

sql-server - 使用 Excel 2016 作为源