go - 处理来自 RabbitMQ 的消息时限制并发

标签 go

我正在尝试从队列 (RabbitMQ) 读取 URL 并发出有限数量的并发 HTTP 请求,即有一个由 10 个工作人员组成的池对从队列接收的 URL 发出并发请求(永远)。

到目前为止,我已经按照 RabbitMQ 教程实现了一个消费者: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

并尝试了网上发现的示例中的多种方法,以此处的示例结束: http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我当前的代码运行大约一分钟,然后无限期卡住。我尝试过添加/移动 Go 例程,但似乎无法让它按预期工作(我对 Go 很陌生)。

当前代码:

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/Xide/bloom"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

var netClient = &http.Client{
    Timeout: time.Second * 10,
}

func getRequest(url string) {
    //resp, err := http.Get(string(url))
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

func main() {
    bf := bloom.NewDefaultScalable(0.1)

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "urls",            // name
        true,              // durable
        false,             // delete when unused
        false,             // exclusive
        false,             // no-wait
        nil,               // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, //global
    )
    failOnError(err, "Failed to set Qos")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    concurrency := 10
    sem := make(chan bool, concurrency)
    go func() {
        for d := range msgs {
            sem <- true
            url := string(d.Body)
            if bf.Match(url) == false {
                bf.Feed(url)
                log.Printf("Not seen: %s", d.Body)
                go func(url string) {
                    defer func() { <-sem }()
                    getRequest(url)
                }(url)
            } else {
                log.Printf("Already seen: %s", d.Body)
            }
            d.Ack(false)
        }
        for i := 0; i < cap(sem); i++ {
            sem <- true
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

最佳答案

您没有正确处理 HTTP 响应,导致打开的连接不断增加。试试这个:

func getRequest(url string) {
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    // Add this bit:
    defer func() {
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    }()
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

在您读完 channel 中的消息后,这似乎是不必要的并且可能存在问题:

    for i := 0; i < cap(sem); i++ {
        sem <- true
    }

为什么在读取队列中的所有消息后还要填充 sem channel ?您向 channel 添加的消息数量与您期望从中读取的消息数量完全相同,因此这充其量是没有意义的,并且如果您对其余代码进行了错误的更改,则可能会导致问题。

与您的问题无关,但这是多余的:

if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
}

the documentationFatalf 已经退出,因此 panic 永远不会被调用。如果您想记录并panic,请尝试 log.Panicf ,就是为此目的而设计的。

关于go - 处理来自 RabbitMQ 的消息时限制并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45858684/

相关文章:

mongodb - 在使用MongoDB-mgo插入许多文档的同时,如何忽略重复的键错误并继续插入?

go - fork 存储库上的 'go build' 问题

go - 将插件添加到 go 程序

Golang 无法在同一个包中进行测试

go - 修复 GoLand 未找到模块依赖项 ("cannot resolve...")?

multithreading - 我如何将 worker 返回到 Go 中的 worker 池

go - 由于编码,Unmarshal 返回空白对象

ssh - Golang SSH 服务器 : How to handle file transfer with scp?

parsing - yacc shift-reduce 用于不明确的 lambda 语法

go - 即使包存在,Dep init 也会失败