multithreading - 如何在bash脚本中实现多线程访问基于文件的队列

标签 multithreading bash posix locks

我在设计一个多处理的 bash 脚本时遇到了一些问题,该脚本通过网站,跟踪找到的链接并在每个新页面上进行一些处理(它实际上收集电子邮件地址,但这不是问题的重要细节)。

脚本应该像这样工作:

  1. 下载页面
  2. 解析所有链接并将它们添加到队列
  3. 做一些不重要的处理
  4. 从队列中弹出一个 URL 并从
  5. 开始

这本身编程起来非常简单,问题来自两个限制和脚本需要具有的功能。

  • 脚本不得对一个 URL 进行两次处理
  • 脚本必须能够一次处理 n 个(作为参数提供)页面
  • 脚本必须是 POSIX 兼容的(curl 除外)-> 所以没有花哨的锁

现在,我设法想出了一个实现,它使用两个文件作为队列,其中一个存储所有已处理的 URL,另一个存储已找到但尚未处理的 URL。

主进程简单地生成一堆共享队列文件的子进程,并且(在循环中直到 URLs-to-be-processed-queue 为空)从 URLs-to-be 中弹出一个 URL -processed-queue,处理页面,尝试将每个新找到的链接添加到 URLs-already-processed-queue,如果成功(URL 不存在)将其添加到URLs-to-be-processed-queue 也是。

问题在于您不能(AFAIK)使队列文件操作原子化,因此锁定是必要的。锁定符合 POSIX 的方式是……恐怖……缓慢的恐怖。

我的做法如下:

#Pops first element from a file ($1) and prints it to stdout; if file emepty print out empty return 1
fuPop(){
if [ -s "$1" ]; then
        sed -nr '1p' "$1"
        sed -ir '1d' "$1"
        return 0
else
        return 1
fi
}

#Appends line ($1) to a file ($2) and return 0 if it's not in it yet; if it, is just return 1
fuAppend(){
if grep -Fxq "$1" < "$2"; then
        return 1
else
        echo "$1" >> "$2"
        return 0
fi
}

#There're multiple processes running this function.
prcsPages(){
while [ -s "$todoLinks" ]; do
        luAckLock "$linksLock"
        linkToProcess="$(fuPop "$todoLinks")"
        luUnlock "$linksLock"

        prcsPage "$linkToProcess"
        ...
done
...
}


#The prcsPage downloads it, does some magic and than calls prcsNewLinks and prcsNewEmails that both get list of new emails / new urls in $1
#$doneEmails, ..., contain file path, $mailLock, ..., contain dir path

prcsNewEmails(){
luAckLock "$mailsLock"
for newEmail in $1; do
        if fuAppend "$newEmail" "$doneEmails"; then
                echo "$newEmail"
        fi
done
luUnlock "$mailsLock"
}

prcsNewLinks(){
luAckLock "$linksLock"
for newLink in $1; do
        if fuAppend "$newLink" "$doneLinks"; then
                fuAppend "$newLink" "$todoLinks"
        fi
done
luUnlock "$linksLock"
}

问题是我的实现很慢(真的很慢),几乎慢到使用超过 2 10 没有意义(减少锁等待有很大帮助) child 过程。你实际上可以禁用锁(只需注释掉 luAckLock 和 luUnlock 位)并且它工作得很好(并且快得多)但是每隔一段时间就会出现竞争条件 seds -i 和只是感觉不对。

最糟糕的(如我所见)是锁定在 prcsNewLinks 中,因为它需要相当多的时间(大部分时间基本上是运行的)并且实际上阻止其他进程开始处理新页面(因为它需要从(当前锁定的)$todoLinks 队列中弹出新的 URL。

现在我的问题是,如何做得更好、更快、更好?

整个脚本是here (它包含一些信号魔法、大量调试输出,而且通常不是那么好的代码)。

顺便说一句:是的,您是对的,在 bash 中执行此操作——更重要的是,以符合 POSIX 的方式执行此操作——太疯狂了!但这是大学作业,所以我不得不做

//虽然我觉得我并没有真正期望我解决这些问题(因为竞争条件只有在有 25 个以上的线程时才会更频繁地出现,这可能不是一个正常人会测试的东西)。


代码注释:

  1. 是的,等待应该有(并且已经有)随机时间。此处共享的代码只是在实际分析讲座中编写的概念证明。
  2. 是的,调试输出的数量及其格式很糟糕,应该有独立的日志记录功能。然而,这不是我的问题所在。

最佳答案

首先,您是否需要实现自己的 HTML/HTTP 抓取?为什么不让 wgetcurl 为您递归访问网站?

您可以将文件系统滥用为数据库,并让您的队列成为单行文件的目录。 (或文件名是数据的空文件)。这将为您提供生产者-消费者锁定,生产者接触文件,消费者将其从传入目录移至处理/完成目录。

这样做的美妙之处在于多个线程接触同一个文件 Just Works。期望的结果是 url 在“传入”列表中出现一次,这就是当多个线程创建具有相同名称的文件时得到的结果。由于您需要重复数据删除,因此在写入传入列表时不需要锁定。

1) Downloads a page

2) Parses out all links and adds them to queue

对于找到的每个链接,

grep -ql "$url" already_checked || : > "incoming/$url"

[[ -e "done/$url" ]] || : > "incoming/$url"

3) Does some unimportant processing

4) Pops an URL from queue and starts from 1)

# mostly untested, you might have to debug / tweak this
local inc=( incoming/* )
# edit: this can make threads exit sooner than desired.
# See the comments for some ideas on how to make threads wait for new work
while [[ $inc != "incoming/*" ]]; do
    # $inc is shorthand for "${inc[0]}", the first array entry
    mv "$inc" "done/" || { rm -f "$inc"; continue; } # another thread got that link, or that url already exists in done/
    url=${inc#incoming/}
    download "$url";
    for newurl in $(link_scan "$url"); do
        [[ -e "done/$newurl" ]] || : > "incoming/$newurl"
    done
    process "$url"
    inc=( incoming/* )
done

编辑:将 URL 编码为不包含 / 的字符串作为练习留给读者。虽然可能 urlencoding /%2F 会工作得很好。

我正在考虑将 URL 移动到每个线程的“处理”列表中,但实际上如果您不需要能够从中断中恢复,您的“完成”列表可以改为“排队和完成”列表。我认为 mv "$url""threadqueue.$$/"之类的东西实际上没有用。

“done/”目录会变得很大,并且开始变慢,可能有 10k 个文件,这取决于您使用的文件系统。将“完成”列表维护为每行一个 url 的文件或一个数据库(如果有一个对单个命令速度很快的数据库 CLI 界面)可能会更有效。

将已完成列表维护为一个文件并不坏,因为您永远不需要删除条目。您可能可以在不锁定它的情况下逃脱,即使对于追加它的多个进程也是如此。 (我不确定如果线程 B 在线程 A 打开文件和线程 A 执行写入之间的 EOF 写入数据会发生什么。线程 A 的文件位置可能是旧的 EOF,在这种情况下它会覆盖线程 B 的条目,或者更糟,只覆盖它的一部分。如果你确实需要锁定,也许 flock(1) 会很有用。它获得一个锁,然后执行你作为参数传递的命令。)

如果没有发生因缺少写锁定而损坏的文件,那么您可能不需要写锁定。与每次检查/追加都必须锁定相比,“完成”列表中偶尔出现的重复条目会稍微慢一些。

如果您需要严格正确地避免多次下载同一个 URL,则需要读者等待作者完成。如果您可以在末尾 sort -u 电子邮件列表,那么读者在附加列表时阅读旧副本并不是一场灾难。那么写者只需要相互锁定,读者就可以读取文件了。如果他们在作者设法编写新条目之前达到了 EOF,那就这样吧。

我不确定线程​​在将条目从传入列表中删除之前或之后将条目添加到“完成”列表是否重要,只要它们在处理之前将它们添加到“完成”即可。我在想,一种或另一种方式可能会使比赛更有可能导致重复完成的条目,并且不太可能进行重复下载/处理,但我不确定。

关于multithreading - 如何在bash脚本中实现多线程访问基于文件的队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29704012/

相关文章:

java - 如何可靠地从 Guava LoadingCache 中删除记录?

linux - 如何删除空格后的每个字符?

C、clock_gettime,返回不正确的纳秒值?

c - 如何使用 POSIX 线程在 C 语言中创建特定于线程的全局变量?

java - 如何在没有数据库的情况下为部署在多个服务器上的服务的每次调用生成 16 位唯一号码?

java - 根据条件动态提交任务到ExecutorService

regex - 使用 SED 删除不匹配的行

linux - 在 Bash 中,如何在 read -es 后输出一个新行?

c - 弃用/废弃某些 Linux 时间 API 的原因是什么?

c - OpenMP 线程处理的迭代索引