multithreading - 我如何并行化 GPars Actors?

标签 multithreading groovy actor gpars

我对 GPars Actors 的理解可能有偏差,所以如果我错了请纠正我。我有一个 Groovy 应用程序,可以轮询 Web 服务以查找作业。当找到一个或多个作业时,它会将每个作业发送到我创建的 DynamicDispatchActor,然后处理该作业。这些作业是完全独立的,不需要向主线程返回任何东西。当多个作业同时进入时,我希望它们被并行处理,但无论我尝试什么配置, Actor 都会先入先出地处理它们。

举个代码例子:

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def actor = poolGroup.messageHandler {
    when {Integer msg -> 
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

integers.each {
    actor << it
}

打印出来:

I'm number 1 on thread Actor Thread 31
I'm number 2 on thread Actor Thread 31
I'm number 3 on thread Actor Thread 31
I'm number 4 on thread Actor Thread 31
I'm number 5 on thread Actor Thread 31
I'm number 6 on thread Actor Thread 31
I'm number 7 on thread Actor Thread 31
I'm number 8 on thread Actor Thread 31
I'm number 9 on thread Actor Thread 31
I'm number 10 on thread Actor Thread 31

在每次打印输出之间稍作停顿。另请注意,每个打印输出都来自同一个 Actor/线程。

我想在这里看到的是,前 5 个数字会立即打印出来,因为线程池设置为 5,然后当这些线程空闲时,接下来的 5 个数字会被打印出来。我在这里完全偏离基地了吗?

最佳答案

要使其按您预期的方式运行,需要进行一些更改:

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def closure = {
    when {Integer msg ->
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
        stop()
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()

完整要点文件:https://gist.github.com/wololock/7f1348e04f68710e42d2

那么输出将是:

I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1

现在让我们看看发生了什么变化。首先,在您之前的示例中,您只处理了一个 Actor 。您正确定义了 poolGroup,但随后您创建了一个单独的 actor 并将计算转移到这个单独的实例。要并行运行这些计算,您必须依赖 poolGroup 并且只向某些消息处理程序发送输入 - 池组将处理 actor 的创建及其生命周期管理。这就是我们所做的:

def actors = integers.collect { poolGroup.messageHandler(closure) << it }

它将创建一个从给定输入开始的 Actor 集合。池组将注意不超过指定的池大小。然后您必须加入每个 Actor ,这可以通过使用 groovy 的魔法来完成:actors*.join()。感谢应用程序将等待终止,直到所有参与者停止计算。这就是为什么我们必须将 stop() 方法添加到消息处理程序主体的 when 闭包中 - 没有它,它不会终止,因为 pool group 不知道 actors 是否这样做了工作 - 他们可能会等待,例如其他消息。

替代方案

我们还可以考虑使用 GPars 并行迭代的替代解决方案:

import groovyx.gpars.GParsPool

// This example is dummy, but let's assume that this processor is
// stateless and shared between threads component.
class Processor {
    void process(int number) {
        println "${Thread.currentThread().name} starting with number ${number}"
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Processor processor = new Processor()

GParsPool.withPool 5, {
    integers.eachParallel { processor.process(it) }
}

在此示例中,您有一个无状态组件 Processor 和使用具有多个输入值的无状态 Processor 实例的并行计算。

我试图弄清楚您在评论中提到的情况,但我不确定单个参与者是否可以一次处理多条消息。 actor 的无状态仅意味着它在处理消息期间不会更改其内部状态,并且不得在 actor 范围内存储任何其他信息。如果我的推理不正确,如果有人能纠正我,那就太好了:)

希望对您有所帮助。最好!

关于multithreading - 我如何并行化 GPars Actors?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27325006/

相关文章:

python - QThread 不使用事件更新 View

scala - Play Websocket示例-仅一位Akka Actor ?

python - 为什么只有一个工作线程的 ThreadPoolExecutor 仍然比正常执行速度更快?

javascript - 如何在发送 JSON 时格式化 Base64 编码的字符串

java - Groovy 依赖注入(inject)

java - 运行页面刷新后发生的线程(失控线程)

serialization - 无法反序列化 ActorRef 以将结果发送到不同的 Actor

exception - Service Fabric Actor 远程调用导致 "Specified cast is not valid."异常

JavaFX:用线程移动一个圆圈

android - java.lang.RuntimeException : Can't create handler inside thread that has not called Looper. 准备();