java - AWS Step Functions 如何处理工作线程/Activity 竞争条件?

标签 java amazon-web-services spring-boot aws-step-functions

我尝试将 Java 与 Spring Boot 框架和 AWS Step Functions 结合使用,如下例之一。在这里,我们在服务启动时在 Java 中设置一个 Runnable 线程,并将该线程注册为“工作”线程,它是 AWS Step Function 中的一个 Activity。

这是我的问题:

当注册同一 Activity (同一 ARN)的多个工作程序时,AWS Step Function 如何处理竞争条件,Step Function 如何避免超过 1 个工作程序拾取任务并执行重复的工作?在 AWS Step Functions 配置中,我们是否可以更改任何设置以防止同一可用区、同一区域的 1 个以上工作线程中发生此类竞争情况?


    import com.amazonaws.services.stepfunctions.AWSStepFunctions;
    import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
    import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;

    @Component
    class MyActivity implements DisposableBean, Runnable {

      private final int WAITS_FOR_ACTIVITY_MILLISECONDS = 500;
    

      public void run() {
        while (shouldRun) {
          GetActivityTaskResult getActivityTaskResult =
              client.getActivityTask(
                  new GetActivityTaskRequest()
                      .withActivityArn(config.getActivityArns().getMyActivity()));
          String taskToken = getActivityTaskResult.getTaskToken();
          if (getActivityTaskResult.getTaskToken() != null) {
            try {

              // Get input
              JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
              ActivityInput input =
                  gson.fromJson(json.toString(), ActivityInput.class);

              // do some work

              
              client.sendTaskSuccess(
                  new SendTaskSuccessRequest()
                      .withOutput(gson.toJson(output))
                      .withTaskToken(taskToken));
            } catch (Exception e) {
              logger.error(e.getMessage());
              e.printStackTrace();
              client.sendTaskFailure(
                  new SendTaskFailureRequest().withTaskToken(taskToken).withError(e.getMessage()));
            }
          } else {
            try {
              Thread.sleep(WAITS_FOR_ACTIVITY_MILLISECONDS);
            } catch (InterruptedException e) {
              logger.error(e.getMessage());
              e.printStackTrace();
            }
          }
        }
      }
  


      @Override
      public void destroy() {
        shouldRun = false;
      }

    }

最佳答案

多个 Activity 工作人员可以轮询同一类型的 Activity 。 Step Functions 管理多个并发执行的状态机和轮询工作的多个 Activity 工作线程之间的多对多关系。

每当 Activity 工作线程成功从感兴趣的 Activity 任务状态中的执行状态机轮询工作时,Step Functions 就会向 Activity 工作线程调度一个唯一的 token 以及任务状态的 JSON 输入。

下一个 Activity 工作人员轮询将不会分配相同的任务。当 Activity 工作线程使用 ActivityTaskSuccess 或 ActivityTaskFailure 调用 Step Functions API 时,它会返回结果和 token 。 Step Function 使用 token 将结果与适当的状态机相匹配。对于大型工作负载,您可以创建 Auto Scaling Activity 工作人员组,并根据需求扩展工作人员。

关于java - AWS Step Functions 如何处理工作线程/Activity 竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64603834/

相关文章:

amazon-web-services - 通过 git aws.push 和 IAM 用户进行 Elastic beanstalk 部署的权限问题

java - SpringBoot : Failing at serialize Integer class

java - MapReduce 扫描 HBase 时,Reducer 的个数始终为一个

java - 小程序访问JDialog信息

Php ftp_connect 在 AWS 服务器上不起作用

sql-server - 无法删除 Amazon RDS SQL Server 数据库

Java数字签名和C++ CryptSignMessage导致的结果不同

java - 休息;我最后的休息;给我一个 :Unreachable statement error, 任何线索为什么?

java - java -cp 与 java -jar 之间的区别

java - 使用 Spring Boot、Spring Security 和 React 时发生 CORS 错误