multithreading - 在 Lparallel 库 (Common Lisp) 中使用队列

标签 multithreading common-lisp message-queue

lparallel 库中队列的基本讨论 https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel-fundamentals/#channels说队列“启用工作线程之间的消息传递”。下面的测试使用共享队列来协调主线程和从线程,其中主线程在退出之前只是等待从线程完成:

(defun foo (q)
  (sleep 1)
  (lparallel.queue:pop-queue q))  ;q is now empty

(defun test ()
  (setf lparallel:*kernel* (lparallel:make-kernel 1))
  (let ((c (lparallel:make-channel))
        (q (lparallel.queue:make-queue)))
    (lparallel.queue:push-queue 0 q)
    (lparallel:submit-task c #'foo q)
    (loop do (sleep .2)
             (print (lparallel.queue:peek-queue q))
          when (lparallel.queue:queue-empty-p q)
            do (return)))
  (lparallel:end-kernel :wait t))

这按预期产生输出:

* (test)

0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)

我的问题是关于我是否正确或完全使用了 lparallel 的队列功能。队列似乎只是使用全局变量来保存线程共享对象的替代品。使用队列的设计优势是什么?为每个提交的任务分配一个队列通常是好的做法吗(假设任务需要通信)?感谢您提供更深入的见解。

最佳答案

多线程工作是通过管理对可变对象的并发访问来完成的 共享状态,即你有一个公共(public)数据结构的锁, 每个线程读取或写入它。

但是建议尽量减少数据的数量 同时访问。队列是一种将工作人员与每个工作人员解耦的方法 其他,通过让每个线程管理其本地状态并交换数据 仅通过消息;这是线程安全的,因为访问 队列由 locks and condition variables 控制.

您在主线程中所做的是轮询队列何时 是空的;这可能有效,但适得其反,因为队列 被用作同步机制,但在这里你正在做 自己同步。

(ql:quickload :lparallel)
(defpackage :so (:use :cl
                      :lparallel
                      :lparallel.queue
                      :lparallel.kernel-util))
(in-package :so)

让我们改变 foo 让它有两个队列,一个用于传入 请求,一个用于答复。在这里,我们执行一个简单的转换 发送的数据和每个输入消息,只有一个 输出消息,但情况并非总是如此。

(defun foo (in out)
  (push-queue (1+ (pop-queue in)) out))

更改test,使控制流仅基于对队列的读/写:

(defun test ()
  (with-temp-kernel (1)
    (let ((c (make-channel))
          (foo-in (make-queue))
          (foo-out (make-queue)))
      (submit-task c #'foo foo-in foo-out)
      ;; submit data to task (could be blocking)
      (push-queue 0 foo-in)
      ;; wait for message from task (could be blocking too)
      (pop-queue foo-out))))

But how can you can avoid polling in test if there are multiple tasks running? Don’t you need to continuously check when any one of them is done so you can push-queue more work to it?

您可以使用不同的并发机制,类似于 listenpoll/epoll ,你在那里观看多个 事件的来源,并在其中一个准备就绪时使用react。有像 Go ( select ) 和 Erlang ( receive ) 这样的语言,其中 这表达起来很自然。在 Lisp 方面,Calispel库提供了类似的交替机制(pri-altfair-alt)。例如,下面是从Calispel的测试代码中截取的:

(pri-alt ((? control msg)
          (ecase msg
            (:clean-up (setf cleanup? t))
            (:high-speed (setf slow? nil))
            (:low-speed (setf slow? t))))
         ((? channel msg)
          (declare (type fixnum msg))
          (vector-push-extend msg out))
         ((otherwise :timeout (if cleanup? 0 nil))
          (! reader-results out)
          (! thread-expiration (bt:current-thread))
          (return)))

lparallel 的情况下,没有这样的机制,但只要您使用标识符标记消息,您就可以只使用队列。

如果您需要在任务 t1t2 给出结果后立即使用react,那么将这两个任务写入相同的结果 channel :

(let ((t1 (foo :id 1 :in i1 :out res))
      (t2 (bar :id 2 :in i2 :out res)))
   (destructuring-bind (id message) (pop-queue res)
     (case id
       (1 ...)
       (2 ...))))

如果您需要在 t1t2 发出结果时同步代码,让它们写入不同的 channel :

(let ((t1 (foo :id 1 :in i1 :out o1))
      (t2 (bar :id 2 :in i2 :out o2)))
   (list (pop-queue o1)
         (pop-queue o2)))

关于multithreading - 在 Lparallel 库 (Common Lisp) 中使用队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55318645/

相关文章:

c++ - 终止正在运行的 boost 线程

ssl - CL+SSL SSL Error : Unsafe legacy renegotiation disabled. 如何绕过或解决?

lisp - 在列表中搜索整数 (Lisp)

python - Python多重处理启动进程,但只有一个处于事件状态

wpf - DispatcherTimer 在 WPF 应用程序中未触发

c++ - boost::进程间消息队列抛出错误

multithreading - RabbitMQ - 每个路由键单个并发 worker

go - 如何让多个请求爆发等待首先完成(分布式)

c++ - 向 QML 应用程序发送键盘事件

macros - LISP:如何跟踪宏