我对使用 Luigi 创建流程还比较陌生,我想了解为什么我的小工作流程会导致未实现的依赖关系。我正在尝试运行任务 StageProviders(),它有一个依赖项 ErrorsLogFile()。必须在 StageProviders 之前运行的任务只是在共享驱动器上创建空白文件的任务。当我尝试按以下流程运行 StageProviders 任务时收到以下消息,如下所示:
代码:
#!/usr/local/bin/python
import luigi
import os
import shutil
import time
import pandas as pd
import time
class DupsExistingLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return None
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class DupsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsExistingLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class ErrorsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errprs.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errors.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class StageProviders(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return ErrorsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt'))
def run(self):
timestr = time.strftime("%Y-%m-%d")
filepath_str = '/root/etc/mnt/Import/' + self.filename
xls_file = pd.ExcelFile(filepath_str)
df = xls_file.parse('Sheet1')
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS.txt')
dest_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt')
if not df.empty:
shutil.copyfile(src_blank_file_str, dest_file_str)
with self.output().open('w') as out_file:
for name in df['NP']:
print(name, end='\n', file=out_file)
输出:
root@ubuntu:~/pythonfiles/luigi_POC/cpi_luigi_poc/src# python3 -m luigi --module provider_import StageProviders --filename CCM_provider_sample.xlsx --
local-scheduler
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsExistingLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 3 ran successfully:
- 1 DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
- 1 DupsLogFile(filename=CCM_provider_sample.xlsx)
- 1 ErrorsLogFile(filename=CCM_provider_sample.xlsx)
* 1 failed:
- 1 StageProviders(filename=CCM_provider_sample.xlsx)
This progress looks :( because there were failed tasks
看来这是因为这条消息:
RuntimeError:运行时未完成的依赖项:ErrorsLogFile_CCM_provider_sam_ad65b206fd
但是,读取输出似乎 ErrorsLogFile_CCM_provider_sam_ad65b206fd 在 StageProviders 运行之前已经完成?...为什么调度程序返回未实现的依赖项?我相信我误解了如何将任务“链接”在一起。我只是希望 StageProviders 任务在成功完成 ErrorsLogFile、DupsLogFile 和 DupsExistingLogFile 任务后运行。
最佳答案
ErrorLogs 的输出中有错字(不幸的是,它没有复制到您的运行方法中)
[...] + timestr + "_Errprs.xlsx"))
因此,任务运行良好并获得状态 DONE,但当 StageProviders 检查其要求时,它调用 ErrorLogs 的完整方法,该方法返回 false,因为该文件不存在,因此出现“运行时未满足的依赖关系”错误。
这个错误一般表示任务的状态在工作流的执行过程中发生了变化。
关于python - Luigi 任务在依赖完成时在运行时返回未完成的依赖,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46412322/