我尝试将 Java 与 Spring Boot 框架和 AWS Step Functions 结合使用,如下例之一。在这里,我们在服务启动时在 Java 中设置一个 Runnable 线程,并将该线程注册为“工作”线程,它是 AWS Step Function 中的一个 Activity。
这是我的问题:
当注册同一 Activity (同一 ARN)的多个工作程序时,AWS Step Function 如何处理竞争条件,Step Function 如何避免超过 1 个工作程序拾取任务并执行重复的工作?在 AWS Step Functions 配置中,我们是否可以更改任何设置以防止同一可用区、同一区域的 1 个以上工作线程中发生此类竞争情况?
import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;
@Component
class MyActivity implements DisposableBean, Runnable {
private final int WAITS_FOR_ACTIVITY_MILLISECONDS = 500;
public void run() {
while (shouldRun) {
GetActivityTaskResult getActivityTaskResult =
client.getActivityTask(
new GetActivityTaskRequest()
.withActivityArn(config.getActivityArns().getMyActivity()));
String taskToken = getActivityTaskResult.getTaskToken();
if (getActivityTaskResult.getTaskToken() != null) {
try {
// Get input
JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
ActivityInput input =
gson.fromJson(json.toString(), ActivityInput.class);
// do some work
client.sendTaskSuccess(
new SendTaskSuccessRequest()
.withOutput(gson.toJson(output))
.withTaskToken(taskToken));
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
client.sendTaskFailure(
new SendTaskFailureRequest().withTaskToken(taskToken).withError(e.getMessage()));
}
} else {
try {
Thread.sleep(WAITS_FOR_ACTIVITY_MILLISECONDS);
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
}
@Override
public void destroy() {
shouldRun = false;
}
}
最佳答案
多个 Activity 工作人员可以轮询同一类型的 Activity 。 Step Functions 管理多个并发执行的状态机和轮询工作的多个 Activity 工作线程之间的多对多关系。
每当 Activity 工作线程成功从感兴趣的 Activity 任务状态中的执行状态机轮询工作时,Step Functions 就会向 Activity 工作线程调度一个唯一的 token 以及任务状态的 JSON 输入。
下一个 Activity 工作人员轮询将不会分配相同的任务。当 Activity 工作线程使用 ActivityTaskSuccess 或 ActivityTaskFailure 调用 Step Functions API 时,它会返回结果和 token 。 Step Function 使用 token 将结果与适当的状态机相匹配。对于大型工作负载,您可以创建 Auto Scaling Activity 工作人员组,并根据需求扩展工作人员。
关于java - AWS Step Functions 如何处理工作线程/Activity 竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64603834/