我正在尝试从队列 (RabbitMQ) 读取 URL 并发出有限数量的并发 HTTP 请求,即有一个由 10 个工作人员组成的池对从队列接收的 URL 发出并发请求(永远)。
到目前为止,我已经按照 RabbitMQ 教程实现了一个消费者: https://www.rabbitmq.com/tutorials/tutorial-one-go.html
并尝试了网上发现的示例中的多种方法,以此处的示例结束: http://jmoiron.net/blog/limiting-concurrency-in-go/
不幸的是,我当前的代码运行大约一分钟,然后无限期卡住。我尝试过添加/移动 Go 例程,但似乎无法让它按预期工作(我对 Go 很陌生)。
当前代码:
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/Xide/bloom"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
var netClient = &http.Client{
Timeout: time.Second * 10,
}
func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}
func main() {
bf := bloom.NewDefaultScalable(0.1)
conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"urls", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
concurrency := 10
sem := make(chan bool, concurrency)
go func() {
for d := range msgs {
sem <- true
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf("Not seen: %s", d.Body)
go func(url string) {
defer func() { <-sem }()
getRequest(url)
}(url)
} else {
log.Printf("Already seen: %s", d.Body)
}
d.Ack(false)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
最佳答案
您没有正确处理 HTTP 响应,导致打开的连接不断增加。试试这个:
func getRequest(url string) {
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
// Add this bit:
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}
在您读完 channel 中的消息后,这似乎是不必要的并且可能存在问题:
for i := 0; i < cap(sem); i++ {
sem <- true
}
为什么在读取队列中的所有消息后还要填充 sem
channel ?您向 channel 添加的消息数量与您期望从中读取的消息数量完全相同,因此这充其量是没有意义的,并且如果您对其余代码进行了错误的更改,则可能会导致问题。
与您的问题无关,但这是多余的:
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
每the documentation ,Fatalf
已经退出,因此 panic
永远不会被调用。如果您想记录并panic
,请尝试 log.Panicf
,就是为此目的而设计的。
关于go - 处理来自 RabbitMQ 的消息时限制并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45858684/