java - 如何使用 Apache Ignite/GridGain 在任何节点上重试失败的作业

标签 java distributed-computing gridgain ignite

我正在尝试 fault tolerance在 Apache Ignite 中。

我不知道如何在任何节点上重试失败的作业。我有一个用例,我的工作将通过进程构建器调用第三方工具作为系统进程来进行一些计算。在某些情况下,该工具可能会失败,但在大多数情况下,可以在任何节点上重试该作业 - 包括之前失败的节点。

目前,Ignite 似乎将作业重新路由到之前没有此作业的另一个节点。因此,过了一会儿所有节点都消失了,任务就会失败。

我正在寻找的是如何在任何节点上重试作业。

这是一个测试来演示我的问题。

这是我随机失败的工作:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

下面是任务:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

测试代码:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

如您所见,应该始终进行故障转移。由于失败的概率!= 1,我预计任务会在某个时刻成功终止。

当概率阈值为 0.5 且总共有 3 个节点时,这种情况几乎不会发生。我收到类似 class org.apache.ignite.cluster.ClusterTopologyException: 无法将作业故障转移到另一个节点(故障转移 SPI 返回 null) 的异常。经过一些调试后,我发现这是因为我最终用完了节点。全部都消失了。

我知道我可以编写自己的 FailoverSpi 来处理这个问题。

但这感觉不太对劲。

首先,这样做似乎有些过分了。
但 SPI 是一种全局性的东西。我想决定每个作业是否应该重试或故障转移。例如,这可能取决于我调用的第三方工具的退出代码。因此通过全局 SPI 配置故障转移是不正确的。

最佳答案

如果 AlwaysFailoverSpi 的当前实现(默认实现)已经尝试了特定作业的所有节点,则不会进行故障转移。我相信它可以是一个配置选项,但现在您必须实现自己的故障转移 SPI(它应该非常简单 - 每次作业尝试故障转移时只需从拓扑中选择一个随机节点)。

关于SPI的全局性质,你是对的,但是它的failover()需要FailoverContext,其中包含有关失败作业的信息(任务名称、属性、异常等),因此你可以根据此做出决定信息。

关于java - 如何使用 Apache Ignite/GridGain 在任何节点上重试失败的作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31124341/

相关文章:

java - 如何将 Char 数组的子字符串传递给 Java 的另一个函数?

java - JViewport 不会为 JPanel 的派生类生成视口(viewport)

java - 如何检查随机生成的数组中的每个 int 是否偶数,如果不是,则让它创建另一个随机数组?

python - 是否可以在 sklearn 中组合多个部分拟合估计量?

java - 了解IgniteDataStreamer : ordering and buffering

java - 旋转外壳 SWT

c - 如何在 OS X C 代码中创建异步定时器?

java - #commit 或 #rollback 之后不调用 org.apache.ignite.transactions.Transaction#close 会导致泄漏/问题吗?

gridgain - 如何将 gridgain/ignite 统计数据输出到文件

python - 在 Spark 中将简单的单行字符串转换为 RDD