我有一个目录,其中包含将近 100 个日志文件,每个文件的大小为 10~15 GB。需求是逐行读取每个文件(顺序无所谓),清理该行json并dump到后端elasticsearch存储进行索引。
这是我做这项工作的 worker
# file = worker.php
echo " -- New PHP Worker Started -- "; // to get how many times gnu-parallel initiated the worker
$dataSet = [];
while (false !== ($line = fgets(STDIN))) {
// convert line text to json
$l = json_decode($line);
$dataSet[] = $l;
if(sizeof($dataSet) >= 1000) {
//index json to elasticsearch
$elasticsearch->bulkIndex($dataSet);
$dataSet = [];
}
}
借助答案here和 here我快到了,它正在(某种程度上)工作,但只需要确保在引擎盖下它确实在做我假设它正在做的事情。
只要一个文件我就可以处理如下
parallel --pipepart -a 10GB_input_file.txt --round-robin php worker.php
效果很好。添加 --round-robin 确保 php 工作进程只启动一次,然后它只是继续接收数据作为管道(穷人的队列)。
所以对于 4CPU 机器,它会启动 4 个 php worker 并非常快速地处理所有数据。
要对所有文件执行相同的操作,这是我的看法
find /data/directory -maxdepth 1 -type f | parallel cat | parallel --pipe -N10000 --round-robin php worker.php
这看起来有点像工作,但我有一种直觉,这是为所有文件并行嵌套的错误方式。
其次,因为它不能使用 --pipepart,所以我认为它比较慢。
第三,作业完成后,我看到在一台 4cpu 机器上,只有 4 个 worker 启动并且作业完成了。这是正确的行为吗?它不应该为每个文件启动 4 个工作人员吗?只是想确保我没有遗漏任何数据。
知道如何以更好的方式做到这一点吗?
最佳答案
如果它们的大小大致相同,为什么不简单地给每个文件一个文件:
find /data/directory -maxdepth 1 -type f |
parallel php worker.php '<' {}
另一种方法是对它们中的每一个使用--pipepart
:
do_one() {
parallel --pipepart -a "$1" --block -1 php worker.php
}
export -f do_one
find /data/directory -maxdepth 1 -type f | parallel -j1 do_one
如果启动 php worker.php
不需要很长时间,那么最后一个可能更可取,因为如果文件大小非常不同,它会分布得更均匀,因此如果最后一个文件很大,您最终不会等待单个进程完成处理。
关于php - 嵌套GNU Parallel处理多个大文件,拆分每个文件数据作为队列处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52981773/