我在使用 MQSeries 时遇到问题多线程环境中的 Perl 模块。这是我尝试过的:
$mqMgr = MQSeries::QueueManager->new()
在不同线程中创建两个句柄.我认为这会给我两个不同的 MQ 连接,但我在第二次调用 MQOPEN()
时得到了返回码 2219 ,这可能意味着我通过两次单独调用 new() 方法获得了与 mq 的相同底层连接。 $mqMgr
.原因是“threads::shared::share 的 arg 1 类型必须是 [$@%] 之一(不是子例程条目)”$mqMgr
作为全局变量。得到相同的 2219 代码。 MQCNO_HANDLE_SHARE_NO_BLOCK
进入 MQSeries::QueueManager->new()
,以便可以跨线程共享单个连接。但是我找不到办法传进去。我的问题是,使用 Perl 模块 MQSeries
我环顾四周,但运气不佳,任何信息将不胜感激。
相关问题:
更新 1:添加一个示例,两个线程中的两个本地 MQSeries::QueueManager 对象导致 MQ 错误代码 2219。
use threads;
use Thread::Queue;
use MQSeries;
use MQSeries::QueueManager;
use MQSeries::Queue;
# globals
our $jobQ = Thread::Queue->new();
our $resultQ = Thread::Queue->new();
# ----------------------------------------------------------------------------
# sub routines
# ----------------------------------------------------------------------------
sub worker {
# fetch work from $jobQ and put result to $resultQ
# ...
}
sub monitor {
# fetch result from $resultQ and put it onto another MQ queue
my $mqQMgr = MQSeries::QueueManager->new( ... );
# different queue from the one in main
# this would cause error with MQ code 2219
my $mqQ = MQSeries::Queue->new( ... );
while (defined(my $result = $resultQ->dequeue())) {
# create an mq message and put it into $mqQ
my $mqMsg = MQSeries::Message->new();
$mqQ->put($mqMsg);
}
}
# main
unless (caller()) {
# create connection to MQ
my $mqQMgr = MQSeries::QueueManager->new( ... );
my $mqQ = MQSeries::Queue->new( ... );
# create worker and monitor thread
my @workers;
for (1 .. $nThreads) {
push(@workers, threads->create('worker'));
}
my $monitor = threads->create('monitor');
while (True) {
my $mqMsg = MQSeries::Message->new ();
my $retCode = $mqQ->get(
Message => $mqMsg,
GetMsgOptions => $someOption,
Wait => $sometime
);
die("error") if ($retCode == 0);
next if ($retCode == -1); # no message
# not we have some job to do
$jobQ->enqueue($mqMsg->Data);
}
}
最佳答案
尝试使用模块进行多线程处理时,存在一个非常现实的危险,即该模块不是线程安全的。由于线程的工作方式,有很多事情可能会变得困惑——你克隆当前的进程状态,包括文件句柄、套接字等。
但是,如果您尝试以异步/线程方式使用它们,它们的行为会非常奇怪,因为这些操作不是(必然)原子的。
所以虽然我不能直接回答你的问题,因为我没有特定模块的经验:
除非您另有了解,否则假设您不能在线程之间共享。它可能是线程安全的,也可能不是。如果不是,它可能看起来还不错,直到有一天你因为并发条件中的竞争条件而发现一个非常难以找到的错误。threads::shared
中明确描述了共享标量/列表。基本上是安全的(即使那样,如果你没有锁定,你仍然会遇到非原子性问题)。
因此,我建议您需要做的是:
Thread::Queue
可以很好地解决这个问题。 require
和 import
- 不是 use
因为它更早地起作用)和实例化。 (你可能会在线程启动之前“加载”模块,但实例化会做一些事情,比如创建描述符、套接字等。)以上大部分内容也适用于
fork
也有并行性——但与 fork
的方式不同。使“共享”内容变得相当困难,因此您不太可能绊倒它。编辑:
查看您发布的代码,并交叉引用
MQSeries
来源:BEGIN
block ,在您 use
的位置设置 MQSeries 的一些内容它。 虽然我不能肯定这是你的问题,但这让我非常警惕 - 因为请记住,当它这样做时,它会设置一些东西 - 然后当你的线程启动时,它们会继承非共享副本在那个“BEGIN” block 期间“不管它做了什么”。
因此,根据我之前的建议 - 我建议您尝试(因为我不能肯定地说,因为我没有引用实现):
require MQSeries;
MQSeries->import;
将其放入您的代码中 - 代替
use
- 线程启动后。例如。在你做 creates
之后并在线程子例程中。
关于MQ 多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33274838/