我对 GPars Actors 的理解可能有偏差,所以如果我错了请纠正我。我有一个 Groovy 应用程序,可以轮询 Web 服务以查找作业。当找到一个或多个作业时,它会将每个作业发送到我创建的 DynamicDispatchActor
,然后处理该作业。这些作业是完全独立的,不需要向主线程返回任何东西。当多个作业同时进入时,我希望它们被并行处理,但无论我尝试什么配置, Actor 都会先入先出地处理它们。
举个代码例子:
def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))
def actor = poolGroup.messageHandler {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
integers.each {
actor << it
}
打印出来:
I'm number 1 on thread Actor Thread 31 I'm number 2 on thread Actor Thread 31 I'm number 3 on thread Actor Thread 31 I'm number 4 on thread Actor Thread 31 I'm number 5 on thread Actor Thread 31 I'm number 6 on thread Actor Thread 31 I'm number 7 on thread Actor Thread 31 I'm number 8 on thread Actor Thread 31 I'm number 9 on thread Actor Thread 31 I'm number 10 on thread Actor Thread 31
在每次打印输出之间稍作停顿。另请注意,每个打印输出都来自同一个 Actor/线程。
我想在这里看到的是,前 5 个数字会立即打印出来,因为线程池设置为 5,然后当这些线程空闲时,接下来的 5 个数字会被打印出来。我在这里完全偏离基地了吗?
最佳答案
要使其按您预期的方式运行,需要进行一些更改:
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool
def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))
def closure = {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
stop()
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()
完整要点文件:https://gist.github.com/wololock/7f1348e04f68710e42d2
那么输出将是:
I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1
现在让我们看看发生了什么变化。首先,在您之前的示例中,您只处理了一个 Actor 。您正确定义了 poolGroup
,但随后您创建了一个单独的 actor 并将计算转移到这个单独的实例。要并行运行这些计算,您必须依赖 poolGroup
并且只向某些消息处理程序发送输入 - 池组将处理 actor 的创建及其生命周期管理。这就是我们所做的:
def actors = integers.collect { poolGroup.messageHandler(closure) << it }
它将创建一个从给定输入开始的 Actor 集合。池组将注意不超过指定的池大小。然后您必须加入
每个 Actor ,这可以通过使用 groovy 的魔法来完成:actors*.join()
。感谢应用程序将等待终止,直到所有参与者停止计算。这就是为什么我们必须将 stop()
方法添加到消息处理程序主体的 when
闭包中 - 没有它,它不会终止,因为 pool group 不知道 actors 是否这样做了工作 - 他们可能会等待,例如其他消息。
替代方案
我们还可以考虑使用 GPars 并行迭代的替代解决方案:
import groovyx.gpars.GParsPool
// This example is dummy, but let's assume that this processor is
// stateless and shared between threads component.
class Processor {
void process(int number) {
println "${Thread.currentThread().name} starting with number ${number}"
Thread.sleep(1000)
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Processor processor = new Processor()
GParsPool.withPool 5, {
integers.eachParallel { processor.process(it) }
}
在此示例中,您有一个无状态组件 Processor
和使用具有多个输入值的无状态 Processor
实例的并行计算。
我试图弄清楚您在评论中提到的情况,但我不确定单个参与者是否可以一次处理多条消息。 actor 的无状态仅意味着它在处理消息期间不会更改其内部状态,并且不得在 actor 范围内存储任何其他信息。如果我的推理不正确,如果有人能纠正我,那就太好了:)
希望对您有所帮助。最好!
关于multithreading - 我如何并行化 GPars Actors?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27325006/