multithreading - Golang HTTP 请求工作池

标签 multithreading api go goroutine

我正在尝试构建一个系统,工作池/作业队列,以在每个 API 端点上处理尽可能多的 http 请求。我调查了这个 example让它工作得很好,只是我偶然发现了我不明白如何将 pool/jobqueue 扩展到不同端点的问题。

出于场景考虑,让我们绘制一个 Golang http 服务器,它在不同的端点和请求类型 GETPOST ETC 上有百万请求/分钟。

如何扩展这个概念?我应该为每个端点创建不同的工作池和作业吗?或者我可以创建不同的作业并将它们输入同一个队列并让同一个池处理这些作业吗?

我想保持简单性,如果我创建一个新的 API 端点,我就不必创建新的工作线程池,这样我就可以只专注于 API。但性能也非常重要。

我尝试构建的代码取自前面链接的示例,here是具有此代码的其他人的 github“要点”。

最佳答案

首先要说明的是:如果你正在运行一个 HTTP 服务器(无论如何都是 Go 的标准服务器),你无法在不停止并重新启动服务器的情况下控制 goroutines 的数量。每个请求至少启动一个 goroutine,对此您无能为力。好消息是这通常不是问题,因为 goroutines 非常轻量级。但是,您希望控制正在努力工作的 goroutines 的数量是完全合理的。

您可以将任何值放入 channel ,包括函数。因此,如果目标是只需要在 http 处理程序中编写代码,就让作业关闭——工作人员不知道(或不关心)他们在做什么。

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan func()
var smallPool chan func()

func main() {
    // Start two different sized worker pools (e.g., for different workloads).
    // Cancelation and graceful shutdown omited for brevity.

    largePool = make(chan func(), 100)
    smallPool = make(chan func(), 10)

    for i := 0; i < 100; i++ {
            go func() {
                    for f := range largePool {
                            f()
                    }
            }()
    }

    for i := 0; i < 10; i++ {
            go func() {
                    for f := range smallPool {
                            f()
                    }
            }()
    }

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    // Imagine a JSON body containing a URL that we are expected to fetch.
    // Light work that doesn't consume many of *our* resources and can be done
    // in bulk, so we put in in the large pool.
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            largePool <- func() {
                    http.Get(job.URL)
                    // Do something with the response
            }
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    // The request body is an image that we want to do some fancy processing
    // on. That's hard work; we don't want to do too many of them at once, so
    // so we put those jobs in the small pool.

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            smallPool <- func() {
                    processImage(b)
            }
    }()
    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

这是一个非常简单的例子来说明这一点。如何设置工作池并不重要。你只需要一个聪明的工作定义。在上面的示例中,它是一个闭包,但您也可以定义一个 Job 接口(interface),例如。

type Job interface {
    Do()
}

var largePool chan Job
var smallPool chan Job

现在,我不会将整个工作线程池方法称为“简单”。你说你的目标是限制 goroutines(正在工作)的数量。那根本不需要 worker ;它只需要一个限制器。这是与上面相同的示例,但是使用 channel 作为信号量来限制并发。

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan struct{}
var smallPool chan struct{}

func main() {
    largePool = make(chan struct{}, 100)
    smallPool = make(chan struct{}, 10)

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2)

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            // Block until there are fewer than cap(largePool) light-work
            // goroutines running.
            largePool <- struct{}{}
            defer func() { <-largePool }() // Let everyone that we are done

            http.Get(job.URL)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            // Block until there are fewer than cap(smallPool) hard-work
            // goroutines running.
            smallPool <- struct{}{}
            defer func() { <-smallPool }() // Let everyone that we are done

            processImage(b)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

关于multithreading - Golang HTTP 请求工作池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46820427/

相关文章:

Java线程,我怎样才能让一个线程总是先停止或结束?

python - 重写 python 脚本以使用 python3

php - 控制 Magento API 调用的结果数量

go - 记录到标准输出并获取堆栈跟踪的字符串值

go - 如何解析文本并将其格式化以将其作为 json 数据发送

java - 正确放置尝试和捕获

python - 线程给出类型错误

vb.net - 如何调试其他线程上的异常?

node.js - 使用 nodejs 客户端访问 golang websocket 服务器

api - 基于 Cookie 的身份验证和 Web API