我正在使用targets
工作流程管道。此管道的一部分是监视 csv 文件目录的更新。该目录中有超过 10,000 个 csv 文件,并且每周都会添加新文件。我希望能够识别新添加的文件并将它们附加到一组现有的 *.rds
文件中。最简单的方法是每周重新运行创建 5 个 *.rds
文件子集的进程,但这需要时间。有效的做法是识别新添加的文件,然后简单地使用正确的 rds
文件进行 bind_rows
操作。
我可以使用 dir()
和 setdiff()
进行典型编程,轻松完成此操作,我在其中存储前一天的 csv 文件路径快照。但我正在努力在 targets
框架内实现这一目标。
这是一个似乎不起作用的尝试。我想我想监视 /_targets
目录中的临时结果,但我不知道如何去做。并且,targets
文档建议不要在目标配置本身内使用 tar_load
。
tar_script({
list(
tar_target(csv_directory, "/csv/"),
tar_target(csv_snapshot, dir(csv_directory)),
tar_target(append_action, if(length(setdiff(dir(csv_directory), dir(csv_snapshot))) > 0){
...}
})
最佳答案
一些可能有帮助的组件:
- 文件目标:https://books.ropensci.org/targets/files.html 。使用
tar_target(format = "file")
,该包会监视输入和/或输出文件的更改,并重新运行受影响的目标(如果有)。 - 替代存储格式:https://docs.ropensci.org/targets/reference/tar_target.html#storage-formats 。与其将 CSV 文件聚合到外部 RDS 文件中,不如使用
tar_target(format = "feather")
之类的东西来提高效率,这样targets
会自动压缩您的输出数据并确保您不必担心文件的微观管理。 - 动态分支:books.ropensci.org/targets/dynamic.html。动态分支是一种在管道运行时定义大量新目标的方法。例如,这使您可以为一个文件或一批现有文件创建新目标。
- 批处理:https://books.ropensci.org/targets/dynamic.html#batching 。 10000 个目标太多了,而
targets
包的速度可能会因为如此多的目标而变慢,因为每个目标都会产生开销。
因此,我建议您将 CSV 文件组织成批处理(例如,按周),并动态地跨批处理进行分支来处理它们。根据您的用例的具体情况,另一种批处理结构可能更合适。
csv/
├── week1/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
├── week2/
│ ├── data1.csv
│ ├── data2.csv
│ ├── ...
...
管道示意图:
# _targets.R
process_csv_dir <- function(csv_dir) {...} # custom user-defined function
list(
tar_target(csv_dir, list.files("csv", full.names = TRUE)),
tar_target(
processed_data,
process_csv_dir(csv_dir),
pattern = map(csv_dir), # dynamic branching
format = "feather" # from the arrow package
)
)
关于r - 使用 R 目标将新数据附加到现有数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69036898/