ruby - 如何设计一个多线程程序,其中每个线程同时读取来自控制台的输入?

标签 ruby multithreading parallel-processing

这个想法是,输入将由控制台提供,并将使用唯一的“id”作为输入的第一个单词进行标识。当遇到新的 id 时,会生成一个新线程,后续输入为“start”。当具有相同 id 的输入显示“关闭”时,线程应该关闭。语句的顺序是随机的。

最佳答案

这里的技巧是让一个线程(最简单的是主线程)完成所有的读取。该线程将解析命令、启动或停止线程以及向线程分派(dispatch)命令。每个线程都有自己的队列,它从中读取命令并将命令放入其中。让我们看看如何实现。

我们将从一个用于控制命令的小模块开始,以便它们是干的:

module ControlCommands
  START_COMMAND = 'start'
  STOP_COMMAND = 'stop'
end

现在让我们看看“主要”:

class Main

  def initialize
    @workers = Workers.new
    @console = Console.new(@workers)
  end

  def run
    @console.read_and_dispatch
    @workers.join
  end

end

Main.new.run

这里没有发生什么大事。我们制作一个控制台,并让它读取命令并将其发送给工作人员。控制台在输入完毕之前不会返回。对 @workers.join 的调用确保所有工作人员都已完成工作并在程序退出之前正确关闭。

这是控制台类:

class Console

  def initialize(workers)
    @workers = workers
  end

  def read_and_dispatch
    while input = gets
      @workers.dispatch *parse_input(input)
    end
  end

  private

  def parse_input(input)
    input.match(/^(\w+) *(.*)$/)[1..2]
  end

end

read_and_dispatch 是主循环。它负责的只是读取和解析输入。只要有输入,它就会将其拆分为worker名称和命令,然后告诉worker处理命令。

这是 Workers 类:

class Workers

  include ControlCommands

  def initialize
    @workers = {}
  end

  def dispatch(worker_name, command)
    case command
    when START_COMMAND
      start_worker worker_name
    when STOP_COMMAND
      stop_worker worker_name
    else
      dispatch_worker worker_name, command
    end
  end

  def join
    @workers.each do |worker_name, worker|
      worker.stop
      worker.join
    end
  end

  private

  def start_worker(worker_name)
    @workers[worker_name] = Worker.new(worker_name)
  end

  def stop_worker(worker_name)
    @workers[worker_name].stop
  end

  def dispatch_worker(worker_name, command)
    @workers[worker_name].dispatch command
  end

end

这是大部分肉的地方。此类创建工作线程(线程)、停止它们并向它们分派(dispatch)命令。请注意,这里没有错误处理:如果您尝试停止未启动的线程、启动已启动的线程或向不存在的线程分派(dispatch)命令,则程序将崩溃或行为异常。我会将这些情况的处理作为“读者的练习”。

这是 Worker 类:

class Worker

  include ControlCommands

  def initialize(name)
    @name = name
    @commands = Queue.new
    @thread = start_thread
  end

  def dispatch(command)
    @commands.enq command
  end

  def stop
    @commands.enq STOP_COMMAND
  end

  def join
    @thread.join
  end

  private

  def start_thread
    Thread.new do
      loop do
        command = @commands.deq
        break if command == STOP_COMMAND
        process_command command
      end
    end
  end

  def process_command(command)
    print "thread #{@name} received command #{command.inspect}\n"
  end

end

此类包含线程以及用于在主线程(读取控制台的线程)和工作线程之间进行通信的队列。该队列还用于同步停止线程,方法是将 STOP_COMMAND 放入线程通过退出来响应的队列中。如果可以的话,最好同步停止线程而不是异步停止线程。

这是一个简单的输入文件:

foo start
bar start
foo input
bar input
foo stop
bar stop

当程序提供该输入时的输出:

thread bar received command "input"
thread foo received command "input"

关于ruby - 如何设计一个多线程程序,其中每个线程同时读取来自控制台的输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13840482/

相关文章:

c++ - Clang 替换 -mthreads 选项

c# - 任务层次结构示例未按预期工作

c# - 处理异步并行任务的多个异常

python - MSBUILD : error MSB3428: Could not load the Visual C++ component "VCBuild.exe"

ruby-on-rails - 从 Controller Action 中调用/lib 中的类

ruby-on-rails - 在 Rails 进程结束后终止它

java - 一个键具有多个值的并发映射,并在超时时自动删除

ruby-on-rails - 如何测试一个 before_save 方法,包括与 rspec 的关联

java - 在 Java/Clojure 中杀死无限循环

azure - 在 Azure 中同步并行操作