java - 在 Jgroups 中使用自定义线程池和执行器

标签 java websphere websphere-8 jgroups

如果我们查看 Jgroups 的 DefaultThreadFactory 内部,会出现以下代码

protected Thread newThread(Runnable r,String name,String addr,String cluster_name) { String thread_name=getNewThreadName(name, addr, cluster_name); Thread retval=new Thread(r, thread_name); retval.setDaemon(createDaemons); 返回retval; }

由于使用了新线程,所以我相信在托管服务器环境中这可能会导致问题并且也不是一个好的做法。

如果我只是用 WebSphere 的托管工厂和执行器替换默认的线程工厂和执行器,Jgroups 的行为是否仍然相同?

任何指针都会有帮助..?

更新

我的意图是将 JGroups 与 WebSphere AS 8.5 一起使用。我渴望没有任何非托管线程。我的主要用例是领导者选举和一些消息传递。它将用于管理 Spring Integration 轮询器并确保只有一个轮询器在集群中运行。

WAS 8.5 仍然使用 CommonJ api 进行工作管理。

我正在使用 Spring 来抽象任务执行器和调度器。

最初很容易用共享执行器 api 的任务执行器替换线程池。

TaskScheduler 必须进行调整才能与您的 TimeScheduler 界面一起使用。 它们非常相似,也许从 ScheduledExecutorService 扩展可能是这里的一个选项。我实现了您的界面并委托(delegate)给了 Springs TaskScheduler。

主要问题出在 ThreadFactory 上。 CommonJ 没有这个概念。为此,我创建了一个 ThreadWrapper,它封装了 Runnable 并在调用“Thread 的”启动方法时委托(delegate)给 TaskExecutor。我忽略了线程重命名功能,因为这不会产生任何影响。

public Thread newThread(Runnable command) {
    log.debug("newThread");
    RunnableWrapper wrappedCommand = new RunnableWrapper(command);
    return new ThreadWrapper(taskExecutor, wrappedCommand);
}

public synchronized void start() {
    try { 
        taskExecutor.execute(runnableWrapper);          
    } catch (Exception e) {
        throw new UnableToStartException(e);
    }
}

这是我遇到问题的地方。问题出在运输上。在许多情况下,一些内部可运行对象的运行方法,例如TP的DiagnosticsHandler、TransferQueueBundler和GMS的ViewHandler都有while语句检查线程。

public class DiagnosticsHandler implements Runnable {
    public void run() {
        byte[] buf;
        DatagramPacket packet;
        while(Thread.currentThread().equals(thread)) {
            //...
        }
    }
}

protected class TransferQueueBundler extends BaseBundler implements Runnable {
    public void run() {
        while(Thread.currentThread() == bundler_thread) {
            //...
        }
    }
}

class ViewHandler implements Runnable {
    public void run() {
        long start_time, wait_time;  // ns
        long timeout=TimeUnit.NANOSECONDS.convert(max_bundling_time, TimeUnit.MILLISECONDS);
        List<Request> requests=new LinkedList<>();
        while(Thread.currentThread().equals(thread) && !suspended) {
            //...
        }
    }
}

这与我们的线程包装不合作。如果可以更改它以便在存储的线程上调用 equals 方法,则可以覆盖它。

正如您从各种片段中看到的那样,有各种不同的实现和保护级别,包、 protected 和公共(public)。这增加了扩展类的难度。

完成所有这些后,它仍然没有完全消除非托管线程的问题。

我正在使用创建协议(protocol)栈的属性文件方法。一旦设置了属性,这将初始化协议(protocol)栈。删除由底层协议(protocol)创建的定时器线程。必须在初始化堆栈之前设置 TimeScheduler。

一旦完成,线程就全部被管理了。

对于如何更轻松地实现这一点,您有什么建议吗?

最佳答案

是的,您可以在线程池中注入(inject),详情请参阅 [1]。

[1] http://www.jgroups.org/manual/index.html#_replacing_the_default_and_oob_thread_pools

关于java - 在 Jgroups 中使用自定义线程池和执行器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33922536/

相关文章:

java - 我们如何检测 main() 线程是否已死亡但所有生成的 thread() 线程都在运行?

java - 将任意文件安装到 Websphere 中所有节点的脚本接口(interface)?

Java Filter 不适用于生产环境 Websphere 8.5

java - 我可以使用比 Websphere 8.0 中捆绑的更新版本的 Jackson 吗?

java-ee-6 - 如何取消或删除 Persistent EJBTimers

java - Spring MVC - 找不到元素 'mvc:annotation-drive' 的声明

eclipse - m2eclipse:Eclipse 在 JRE 中运行,但需要 JDK

memory - 共享库是否减少了 WebSphere AppServer 7 上的内存占用?

rest - WebSphere Application Server/8.0/Uncaught 由 servlet 创建的初始化异常

java - 如何找出导致此 Java FX 应用程序线程异常的原因?