我正在尝试在 Go 中构建一个网络爬虫,我想在其中指定并发工作人员的最大数量。只要队列中有要探索的链接,它们都会工作。当队列中的元素少于工作人员时,工作人员应大声喊叫,但如果找到更多链接,则恢复。
我试过的代码是
const max_workers = 6
// simulating links with int
func crawl(wg *sync.WaitGroup, queue chan int) {
for element := range queue {
wg.Done() // why is defer here causing a deadlock?
fmt.Println("adding 2 new elements ")
if element%2 == 0 {
wg.Add(2)
queue <- (element*100 + 11)
queue <- (element*100 + 33)
}
}
}
func main() {
var wg sync.WaitGroup
queue := make(chan int, 10)
queue <- 0
queue <- 1
queue <- 2
queue <- 3
var min int
if (len(queue) < max_workers) {
min = len(queue)
} else {
min = max_workers
}
for i := 0; i < min; i++ {
wg.Add(1)
go crawl(&wg, queue)
}
wg.Wait()
close(queue)
}
这似乎可行,但有一个问题:我必须在开始时用多个元素填充队列。我希望它从一个(单个)种子页面(在我的示例中为 queue <- 0
)开始,然后动态地扩大/缩小工作池。
我的问题是:
如何获取行为?
为什么延迟
wg.Done()
造成僵局?wg.Done()
正常实际完成时的功能?我认为没有defer
goroutine 不会等待其他部分完成(在解析 HTML 的实际工作示例中这可能需要更长的时间)。
最佳答案
如果您使用自己喜欢的网络搜索“Go 网络爬虫”(或“golang 网络爬虫”) 你会发现很多例子,包括: Go Tour Exercise: Web Crawler . 在 Go 中也有一些关于并发的讨论涵盖了这种事情。
在 Go 中执行此操作的“标准”方法根本不需要涉及 WaitGroup 。
要回答您的一个问题,请使用 defer
排队仅在函数返回时运行。你有一个长时间运行的功能,所以不要使用 defer
在这样一个循环中。
“标准”方式是在自己的 goroutine 中启动任意数量的 worker。 他们都从同一个 channel 读取“工作”,如果/当无事可做时阻塞。 完成后,该 channel 关闭,他们全部退出。
在爬虫之类的情况下, worker 们会发现更多的“工作”要做,并希望将它们排入队列。 你不希望他们写回同一个 channel ,因为它会有一些有限的缓冲(或没有!),你最终会阻止所有试图排队更多工作的 worker !
一个简单的解决方案是使用单独的 channel
(例如每个 worker 都有 in <-chan Job, out chan<- Job
)
以及读取这些请求的单个队列/过滤器 goroutine,
将它们附加到一个 slice 上,它可以让其增长任意大或对其进行一些全局限制,
并且还从 slice 的头部馈送另一个 channel
(即从一个 channel 读取并写入另一个 channel 的简单 for-select 循环)。
此代码通常还负责跟踪已完成的操作
(例如访问过的 URL 的 map )并丢弃传入的重复请求。
队列 goroutine 可能看起来像这样(这里的参数名称过于冗长):
type Job string
func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
var list []Job
done := make(map[Job]bool)
for {
var send chan<- Job
var item Job
if len(list) > 0 {
send = toWorkers
item = list[0]
}
select {
case send <- item:
// We sent an item, remove it
list = list[1:]
case thing := <-fromWorkers:
// Got a new thing
if !done[thing] {
list = append(list, thing)
done[thing] = true
}
}
}
}
在这个简单的例子中,有几件事被掩盖了。
比如终止。如果“Jobs”是一个更大的结构,你想在其中使用 chan *Job
和 []*Job
反而。
在这种情况下,您还需要将 map 类型更改为您从作业中提取的某个键
(例如 Job.URL
也许)
你会想做 list[0] = nil
之前list = list[1:]
摆脱对 *Job
的引用指针并让垃圾收集器更早地处理它。
编辑:关于干净终止的一些说明。
有几种方法可以干净地终止上述代码。可以使用 WaitGroup ,但需要仔细放置添加/完成调用,您可能需要另一个 goroutine 来执行等待(然后关闭其中一个 channel 以开始关闭)。工作人员不应该关闭他们的输出 channel ,因为有多个工作人员并且您不能多次关闭一个 channel ;队列 goroutine 在不知道工作人员何时完成的情况下无法判断何时关闭它与工作人员的 channel 。
过去,当我使用与上述非常相似的代码时,我在“队列”goroutine 中使用了一个本地“未完成”计数器(这避免了对互斥体的任何需求或 WaitGroup 具有的任何同步开销)。将工作发送给 worker 时,未完成工作的数量会增加。当工作人员说完成时,它又减少了。我的代码碰巧有另一个 channel (我的“队列”除了要排队的其他节点外,还在收集结果)。它在自己的 channel 上可能更干净,但可以使用现有 channel 上的特殊值(例如 nil Job 指针)。无论如何,有了这样一个计数器,本地列表上现有的长度检查只需要在列表为空时看到没有任何未完成的,就该终止了;只需关闭与工作人员的 channel 并返回即可。
例如:
if len(list) > 0 {
send = toWorkers
item = list[0]
} else if outstandingJobs == 0 {
close(toWorkers)
return
}
关于go - Go 中的网络爬虫,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29491795/