java - 允许所有工作人员使用全局消费率的队列服务器

标签 java algorithm message-queue amqp

我的服务器需要处理许多任务。由于工作人员需要满足的 API 调用速率限制,这些任务必须以特定的给定速率处理。

为了保证这些任务不会以高于 API 速率限制的速率执行,我希望能够配置队列发送消息以进行处理的速率。

此外,该队列必须保持推送消息的顺序并以 FIFO 顺序释放它们以提供公平性。

最后,如果编码明智,这在使用时将是透明的,那么客户端将调用 API 将消息发送到队列,并且同一客户端将在消息被释放后接收回消息,这将是很棒的根据工作进度和相关顺序排队。例如使用 RxJava

waitForMessageToBeReleased(message, queue)
     .subscribe(message -> // do some stuff)  // message received to the same 
client after it was released by the queue according to the defined work rate.

我目前正在使用 Redis 通过创建一个具有特定数量的 TTL 的变量来控制执行速率,其他调用会一直等到该变量过期。但是,它不处理订单,并可能导致客户在高负载的情况下饿死。

最佳答案

Cadence Workflow能够以最小的努力支持您的用例。

这里有一个稻草人设计可以满足你的要求:

  • 使用 userID 作为工作流 ID 向用户工作流发送 signalWithStart 请求。它要么将信号传递给工作流,要么首先启动工作流并向其传递信号。
  • 对该工作流的所有请求都由它缓冲。 Cadence 提供了硬性保证,即只有一个具有给定 ID 的工作流才能处于打开状态。因此,所有信号(事件)都保证在属于用户的工作流中进行缓冲。
  • 内部工作流事件循环逐一分派(dispatch)这些请求。
  • 当缓冲区为空时,工作流可以完成。

下面是用Java实现的工作流代码(也支持Go客户端):

public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

public interface TaskProcessorActivity {
    @ActivityMethod
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

然后是通过信号方法将该任务排入工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = cadenceClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    cadenceClient.signalWithStart(request);
}

与使用队列进行任务处理相比,Cadence 提供了许多其他优势。

  • 构建了具有无限到期间隔的指数重试
  • 故障处理。例如,如果在配置的时间间隔内两次更新均未成功,它允许执行通知另一项服务的任务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖关系。例如,在发生不可恢复的故障时实现调用链或补偿逻辑 (SAGA)
  • 提供对当前更新状态的完整可见性。例如,当使用队列时,您知道队列中是否有一些消息,并且您需要额外的数据库来跟踪整体进度。使用 Cadence,每个事件都会被记录下来。
  • 能够在飞行中取消更新。
  • 分布式 CRON 支持

参见 the presentation这涵盖了 Cadence 编程模型。

关于java - 允许所有工作人员使用全局消费率的队列服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56725284/

相关文章:

c# - 评估两个列表之间顺序差异的算法

java - 如何有效地生成一组具有预定义分布的唯一随机数?

algorithm - 无法在 MATLAB 中分隔标记

java - 使用 if 语句将结果发送回 jsp

java - 如何在 Android 10 中打开 Activity (传入 voip 调用)

c++ - boost::进程间消息队列timed_receive()内部过程

amazon-dynamodb - 使用 SQS 或 DynamoDB 控制订单状态

c - 使用ftok()创建 key 以及如何使用 key ?

Java 网络摄像头捕获和显示

java - "+"和 "-"键似乎可以切换 SWT 按钮,是否可以覆盖此行为?