我对 Dagster 很陌生,我无法在文档中找到我的问题的答案。
我有 2 个实体:一个是生成从 XML 文件解析的元组(str,str),另一个只是使用元组并将对象存储在 DB 中并设置相应的字段。但是我遇到了一个错误 Core compute for solid returned an output multiple times
.我很确定我在设计中犯了根本性的错误。有人可以向我解释如何以正确的方式设计此管道,或将我指向文档中解释此错误的章节吗?
@solid(output_defs=[OutputDefinition(Tuple, 'classification_data')])
def extract_classification_from_file(context, xml_path: String) -> Tuple:
context.log.info(f"start")
root = ET.parse(xml_path).getroot()
for code_node in root.findall('definition-item'):
context.log.info(f"{code_node.find('classification-symbol').text} {code_node.find('definition-title').text}")
yield Output((code_node.find('classification-symbol').text, code_node.find('definition-title').text), 'classification_data')
@solid()
def load_classification(context, classification_data):
cls = CPCClassification.objects.create(code=classification_data[0], description=classification_data[1]).save()
@pipeline
def define_classification_pipeline():
load_classification(extract_classification_from_file())
最佳答案
我在这里所做的不是受支持的行为。官方声明issue
关于python - Solid 的核心计算多次返回输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59050671/