go - 每个用户处理一条消息

标签 go redis queue message-queue

我在 redis 中有一个列表,用作队列。我在左侧推送元素并从右侧弹出。来自不同用户的请求被推送到队列中。我有一个 goroutine 池,它们从队列(POP)中读取请求并处理它们。我希望一次只能处理每个 userId 的一个请求。我有一个永远运行的 ReadRequest() 函数,它会弹出一个具有 userId 的请求。我需要按用户进入的顺序处理每个用户的请求。我不确定如何实现这一点。我需要每个 userId 的 redis 列表吗?如果是这样,我将如何遍历处理其中请求的所有列表?

for i:=0; i< 5; i++{
  wg.Add(1)
  go ReadRequest(&wg)

}


func ReadRequest(){

   for{

      //redis pop request off list
       request:=MyRedisPop()
       fmt.Println(request.UserId)

      // only call Process if no other goroutine is processing a request for this user
      Process(request)



 time.sleep(100000)
     }

wg.Done()

}

最佳答案

以下是无需创建多个 Redis 列表即可使用的伪代码:

// maintain a global map for all users
// if you see a new user, call NewPerUser() and add it to the list
// Then, send the request to the corresponding channel for processing
var userMap map[string]PerUser 

type PerUser struct {
    chan<- redis.Request // Whatever is the request type
    semaphore *semaphore.Weighted // Semaphore to limit concurrent processing
}

func NewPerUser() *PerUser {
    ch := make(chan redis.Request)
    s := semaphore.NewWeighted(1) // One 1 concurrent request is allowed
    go func(){
        for req := range ch {
            s.Acquire(context.Background(), 1)
            defer s.Release(1)
            // Process the request here
        }
    }()
}

请注意,这只是一个伪代码,我还没有测试它是否有效。

关于go - 每个用户处理一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61799886/

相关文章:

node.js - 如何为即时消息系统设计 redis pub/sub?

c# - 你将如何获得队列中的第一个和最后一个项目?

java - 复制构造函数断言错误

go - 在 golang 中使用 css 选择器解析 html 页面并获取值

go - 如何覆盖现有 go 包的功能?

redis - 如何设置Redis的dump.rdb文件权限?

mongodb - 多次调用 redis 或单次调用 Mongo 或其他数据库引擎

java - Java中的并发和阻塞队列

go - 如何使用方法编写延迟 block 会返回错误

go - 多 Go-Routine 循环未按预期执行