MQ 多线程

标签 multithreading perl ibm-mq

我在使用 MQSeries 时遇到问题多线程环境中的 Perl 模块。这是我尝试过的:

  • 使用 $mqMgr = MQSeries::QueueManager->new() 在不同线程中创建两个句柄.我认为这会给我两个不同的 MQ 连接,但我在第二次调用 MQOPEN() 时得到了返回码 2219 ,这可能意味着我通过两次单独调用 new() 方法获得了与 mq 的相同底层连接。
  • 仅将一个 $mqMgr 声明为全局 共享 多变的。但是我不能将 MQSeries::QueueManager 对象的引用分配给 $mqMgr .原因是“threads::shared::share 的 arg 1 类型必须是 [$@%] 之一(不是子例程条目)”
  • 只申报一个$mqMgr作为全局变量。得到相同的 2219 代码。
  • 试图通过MQCNO_HANDLE_SHARE_NO_BLOCK进入 MQSeries::QueueManager->new() ,以便可以跨线程共享单个连接。但是我找不到办法传进去。

  • 我的问题是,使用 Perl 模块 MQSeries
  • 如何/我可以从不同的线程获得到 MQ 队列管理器的单独连接?
  • 如何/我可以跨不同线程共享到 MQ 队列管理器的连接?

  • 我环顾四周,但运气不佳,任何信息将不胜感激。

    相关问题:
  • C++ - MQ RC Code 2219


  • 更新 1:添加一个示例,两个线程中的两个本地 MQSeries::QueueManager 对象导致 MQ 错误代码 2219。
    use threads;
    use Thread::Queue;
    use MQSeries;
    use MQSeries::QueueManager;
    use MQSeries::Queue;
    
    # globals
    our $jobQ = Thread::Queue->new();
    our $resultQ = Thread::Queue->new();
    
    # ----------------------------------------------------------------------------
    # sub routines
    # ----------------------------------------------------------------------------
    
    sub worker {
        # fetch work from $jobQ and put result to $resultQ
        # ...
    }
    
    sub monitor {
        # fetch result from $resultQ and put it onto another MQ queue
        my $mqQMgr = MQSeries::QueueManager->new( ... );
    
        # different queue from the one in main
        # this would cause error with MQ code 2219
        my $mqQ = MQSeries::Queue->new( ... );
    
        while (defined(my $result = $resultQ->dequeue())) {
            # create an mq message and put it into $mqQ
            my $mqMsg = MQSeries::Message->new();
            $mqQ->put($mqMsg);
        }   
    }
    
    # main
    unless (caller()) {
        # create connection to MQ
        my $mqQMgr = MQSeries::QueueManager->new( ... );
        my $mqQ = MQSeries::Queue->new( ... );
    
        # create worker and monitor thread
        my @workers;
        for (1 .. $nThreads) {
            push(@workers, threads->create('worker'));
        }
        my $monitor = threads->create('monitor');
    
        while (True) {
            my $mqMsg = MQSeries::Message->new ();
    
            my $retCode = $mqQ->get(
                Message => $mqMsg,
                GetMsgOptions => $someOption,
                Wait => $sometime
            );
    
            die("error") if ($retCode == 0);
            next if ($retCode == -1); # no message
    
            # not we have some job to do
            $jobQ->enqueue($mqMsg->Data);
        }
    }
    

    最佳答案

    尝试使用模块进行多线程处理时,存在一个非常现实的危险,即该模块不是线程安全的。由于线程的工作方式,有很多事情可能会变得困惑——你克隆当前的进程状态,包括文件句柄、套接字等。

    但是,如果您尝试以异步/线程方式使用它们,它们的行为会非常奇怪,因为这些操作不是(必然)原子的。

    所以虽然我不能直接回答你的问题,因为我没有特定模块的经验:

    除非您另有了解,否则假设您不能在线程之间共享。它可能是线程安全的,也可能不是。如果不是,它可能看起来还不错,直到有一天你因为并发条件中的竞争条件而发现一个非常难以找到的错误。
    threads::shared 中明确描述了共享标量/列表。基本上是安全的(即使那样,如果你没有锁定,你仍然会遇到非原子性问题)。

    因此,我建议您需要做的是:

  • 有一个“comms”线程,它完成与模块相关的所有工作,并使其他线程使用 IPC 与其通信。 Thread::Queue可以很好地解决这个问题。
  • 出于模块的目的,将每个线程视为完全独立的。这包括加载它(使用 requireimport - 不是 use 因为它更早地起作用)和实例化。 (你可能会在线程启动之前“加载”模块,但实例化会做一些事情,比如创建描述符、套接字等。)
  • 当有任何中断原子操作的危险时锁定东西。

  • 以上大部分内容也适用于 fork也有并行性——但与 fork 的方式不同。使“共享”内容变得相当困难,因此您不太可能绊倒它。

    编辑:

    查看您发布的代码,并交叉引用 MQSeries来源:
  • 有一个BEGIN block ,在您 use 的位置设置 MQSeries 的一些内容它。

  • 虽然我不能肯定这是你的问题,但这让我非常警惕 - 因为请记住,当它这样做时,它会设置一些东西 - 然后当你的线程启动时,它们会继承非共享副本在那个“BEGIN” block 期间“不管它做了什么”。

    因此,根据我之前的建议 - 我建议您尝试(因为我不能肯定地说,因为我没有引用实现):
    require MQSeries; 
    MQSeries->import;
    

    将其放入您的代码中 - 代替 use - 线程启动后。例如。在你做 creates 之后并在线程子例程中。

    关于MQ 多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33274838/

    相关文章:

    java - 在 MQ v6 api 中 - 如何停止 MQQueue get() 方法?

    c# - Windows 窗体中的 Gif

    regex - Perl正则表达式匹配包含字母/数字/点的字符串

    php - 处理适用性声明 2 (AS2) 请求

    perl - 确保我对 utf8 的处理是正确的

    multithreading - 如何将函数发送到另一个线程?

    c - WMQ 安全退出 UserID/PWD 传递问题

    ibm-mq - 通过 UFT 12.02 进行 MQ 6.0 测试

    java - JBoss 5.1.0GA : "java.lang.IllegalStateException: Null beanMetaData" and "java.lang.RuntimeException: failed to initialize bean container"

    c - 在线程中休眠(C/POSIX 线程)