java - Phaser 问题(使用 JSR166y 和最新版本 6 的 JDK)

标签 java multithreading concurrency race-condition phaser

所以,我的 Phaser 非常灵活,但似乎我缺少了一些东西。
我已经成功地使用了 CyclicBarrier,但现在我还想要一些更灵活的东西,正如我所说的。所以这是代码:

声明:

private static final CountDownLatch synchronizer = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);


代码:

try {
    logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);

    final int peerKQueries = sp.getInteger(peerSocket);
    peerObjects = new String[peerKQueries];
    peerValues = new BigDecimal[peerKQueries];
    for ( int i = 0; i < peerObjects.length; i++ )
       peerObjects[i] = sp.getString(peerSocket);
    for ( int i = 0; i < peerValues.length; i++ )
       peerValues[i] = sp.getBigDecimal(peerSocket);

    final int phase1a = htPhaser1a.arrive();
    if ( phase1a < 0 ) {
        logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because it arrived lately for Phase 1a!", true);
        sp.close(peerSocket);
        throw new IllegalThreadStateException();
    } else {
        logger.INFO(pID + " -> Arrived in HT phase 1a. Total arrivals: "+htPhaser1a.getArrivedParties(), true);
        logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1b/2 (phase number is "+phase1a+").", true);
        // The last peer should also unblock the barrier.
        if ( htPhaser1a.getArrivedParties() == TOTAL_PEERS.get() ) {
          htPhaser1a.arrive();
          synchronizer.countDown();
        }
            htPhaser1a.awaitAdvanceInterruptibly(phase1a, 30, TimeUnit.SECONDS);
    }

} catch (IOException e) {
    logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
    sp.close(peerSocket);
    throw new IllegalThreadStateException();
} catch (TimeoutException e) {
    logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
    if ( HAS_TIMED_OUT.compareAndSet(false, true) ) {
        logger.INFO("Parties NOT arrived in the timeout: "+(htPhaser1a.getUnarrivedParties()-1), true);
        resetCriticalData(htPhaser1a.getArrivedParties());
        htPhaser1a.forceTermination();
        instantiateHTPhase1b();
        instantiateHTPhase2();
        instantiateHTPatch();
        synchronizer.countDown();
    }
} finally {
    logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
    synchronizer.await();
    logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);
}

sp.getSomething(); 是 I/O 调用。
考虑到此代码示例是由多个线程运行的。

这是我的问题:我已确保不超过 MAX_CLIENTS 到达移相器,因此如果 MAX_CLIENTS 到达一切都很好。但是,我遇到了 TimeoutException 的问题。第一个是客户端(例如线程 A)能够到达该阶段的时间窗口(又名竞争条件),然后线程 B 中发生 TimeoutException,我在线程 B 中动态实例化另一个移相器,其中包含到达各方的数量(例如 5),但随后线程 A 已经到达该阶段(又名 Phase1a 未发现 < 0)。我该如何纠正这个问题?我正在考虑使用信号量,但我认为这不值得付出努力,因为那样我可能需要重新考虑我这样做的方式。我还考虑过使用计时器并递增 AtomicInteger 变量,并在计时器到期时动态实例化 Phaser。您对如何解决这个问题有什么想法吗?

编辑:
文档有一个bulkRegister(int party)方法,但措辞有点奇怪:

将给定数量的新未到达方添加到此移相器。如果 onAdvance(int, int) 的持续调用正在进行中,则此方法可能会在返回之前等待其完成。如果该移相器有父移相器,并且给定的参与方数量大于零,并且该移相器之前没有注册参与方,则该子移相器也会向其父移相器注册。如果此移相器终止,则尝试注册无效,并返回负值。

问题:“可能”这个词让我感到困惑! “可能”是“可能”还是“可能”是“意愿”?

编辑:
已解决。检查下面我的答案。

最佳答案

声明:

private static final CountDownLatch PEER = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);
htPeerPhaser = new Phaser();


代码:

...
htPeerPhaser.register(); // Called only once.
...
// Note: Server application has guaranteed that no more than the maximum number of peers will arrive.
try {
    logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);
    final int peerKQueries = sp.getInteger(peerSocket);
    peerObjects = new String[peerKQueries];
    peerValues = new BigDecimal[peerKQueries];
    for ( int i = 0; i < peerObjects.length; i++ )
        peerObjects[i] = sp.getString(peerSocket);
    for ( int i = 0; i < peerValues.length; i++ )
        peerValues[i] = sp.getBigDecimal(peerSocket);
    final int registrationID = htPeerPhaser.bulkRegister(1);
    if ( registrationID < 0 ) {
        logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because peer registration has stopped!", true);
        sp.close(peerSocket);
        throw new IllegalThreadStateException();
    }
    logger.INFO(pID + " -> Registered for HT phase 1.", true);
    logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1/2.", true);
    // The last peer should also unblock the barrier.
    if ( htPeerPhaser.getRegisteredParties() == TOTAL_PEERS.get()+1 ) {
        htPeerPhaser.forceTermination();
        PEER.countDown();
    }
    htPeerPhaser.awaitAdvanceInterruptibly(registrationID, 30, TimeUnit.SECONDS);

} catch (IOException e) {
    logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
    sp.close(peerSocket);
    throw new IllegalThreadStateException();
} catch (TimeoutException e) {
    htPeerPhaser.forceTermination();
    logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
    if ( HAS_TIMED_OUT.compareAndSet(false, true) && htPeerPhaser.getRegisteredParties() < TOTAL_PEERS.get()+1 ) {
        final int arrivedPeers = htPeerPhaser.getRegisteredParties()-1;
        logger.INFO("Parties that arrived before timeout: "+arrivedPeers, true);
        final int unarrivedPeers = TOTAL_PEERS.get()-arrivedPeers;
        logger.INFO("Parties NOT arrived due to timeout: "+unarrivedPeers, true);
        resetCriticalData(arrivedPeers);
        instantiateHTPhase1b();
        instantiateHTPhase2();
        instantiateHTPatch();
        PEER.countDown();
        logger.INFO("Super Peer thread " + THREAD_ID + " re-instantiated critical data.", true);
    }
}
logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
PEER.await();
logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);

关于java - Phaser 问题(使用 JSR166y 和最新版本 6 的 JDK),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7651451/

相关文章:

java - 等待并锁定相同资源的 Tomcat 线程

java - 如何从非 Activity 类关闭当前前台 Activity

c# - 如何使用委托(delegate)在线程包装器类中传递方法?

Java - 是否可以以这种方式使用观察者模式和线程?

java - 将 tryLock() 与 wait() 和 notify()/notifyAll() 一起使用

java - 并发线程安全 AtomicInteger

java - 生产者-消费者问题的一个转折点

java - 我应该如何将 Json 转换为 ResponseEntity<String>?

java - 无法从firebase数据库android、回收器 View 中检索数据

Java程序通过调用C代码实现矩阵相乘