multithreading - 线程池、线程安全队列、OOP

标签 multithreading delphi queue threadpool indy

假设以太网上有一些非 PC 设备,根据给定的 API,我可以使用 UDP 与它们进行通信。

我想使用线程池和线程安全队列进行 UDP 通信。每个工作线程都有自己的 indy TIdUDPclient 实例。该通信就像向设备发送一个 UDP 数据报然后等待应答。答案是数据段大于 2 个字节的 UDP 数据报。

设备由 TDevice 类实例表示。

TDevice = class(TObject)
private
  ...
  FStatus: byte;
  ...
public
  ...
  function GetStatus(): integer; //API commandID = 68 (Getting device Status)
end;

thPool 用于创建/管理线程并将作业推送到队列:

TthPool = class(TObject)
private
  FQueue: TThreadedQueue<TrUDPdirectJob*>;
  FthWorkers: TList<TthWorker>;
public
  constructor Create( AthCount, AQueueDepth: integer); //creating the pool here
  function SendCommand( ArSendJob: TrSendJob );
end;

function SendCommand(ArSendJob TrSendJob): integer; 
begin
  ...
  FQueue.PushItem( ArSendJob );
end;

TDevice 的一个功能是获取它所代表的硬件的状态,并根据收到的答案设置其 FStatus 的值:

function TDevice.GetStatus(): integer;  //command byte: 68
const
  PARAMSLENGTH = 4;
var
  rSendJob: TrSendJob*        
begin
  rSendJob.IP := self.FIP;
  rSendJob.port := self.Fport;
  rSendJob.commandID := 68;
  rSendJob.paramslength := PARAMLENGTH ; //need to send the length (API req) 
  SetLength( rSendJob.params, PARAMSLENGTH  );  
  rSendJob.params[0] := $A; //just some parameters along the commandID
  rSendJob.params[1] := $B;
  rSendJob.params[2] := $C;
  rSendJob.params[3] := $D;
  ...
  thPool.SendCommand( rSendJob );   //pushing the job to the queue   
end;

*TrSendJob = record  //Job define
  ip: string;
  port: integer;
  commandID: byte;        
  params: Tparams; //Array of byte
  paramslength: byte;
end;

从工作线程发送UDP数据报:

procedure TthWorker.Execute;
var
  sendBuffer: TIDbytes;
  rBuffer: TIdBytes;
  rSendJob: TrSendJob;
begin
  inherited;
  repeat
    FQueue.PopItem(rSendJob); //Getting the job from the queue
    //Building the sending buffer
    sendBuffer[0] := rSendJob. ... ; 
    ...
    FIdUDPclient.SendBuffer( rSendJob.IP, rSendJob.port, sendBuffer );
    if FIdUDPclient.receiveBuffer(rbuffer, RECTIMEOUT_GLOBAL) >= 1 then
    begin
      //
    end;
  until Terminated;
end;

总而言之,thPoolSendCommand将作业插入队列,工作线程从队列中拉出作业并发送出UDP数据报,然后它的在rbuffer中取回答案字节。但此时,如何将receivebuffer的内容发送回TDevice实例进行处理呢?执行此操作的正确(OOP)方法是什么?我需要在 TDevice 方法内进行处理。

如果我在 TrSendJob 定义一个 plus 方法指针字段,这样工作线程就知道要调用什么方法来处理缓冲区并设置 FState 值,这是一个正确的解决方案吗? TDevice

TProcessReceiveBuffer = function(ArBuffer: TIdBytes): integer;

TrSendJob = record  //Job define
  ip: string;
  port: integer;
  commandID: byte;        
  params: Tparams; //Tparams array of byte
  paramslength: byte;
  *ProcessReceiveBuffer: TProcessReceiveBuffer; //pointer to a TDevice method thats processing the receivebuffer
end;

或者,整个概念都是错误的。

最佳答案

But at this point, how to send the contents of receivebuffer back to TDevice instance for processing?

您可以在 TrSendJob 中包含一个 TDevice 对象指针,然后 TthWorker 将知道要传递哪个 TDevice缓冲到。

或者,您可以将缓冲区和 TEvent 放入 TrSendJob 中,然后让 TthWorker 填充缓冲区并发出事件信号,然后TDevice.GetStatus() 等待事件信号并处理缓冲区。

关于multithreading - 线程池、线程安全队列、OOP,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33434799/

相关文章:

java - 主线程等待两个并行子线程java

delphi - 如何在窗口中找到所有文本字段?

mysql - 哪些可能的原因会导致nodejs中的mysql队列达到限制问题

C++多线程互斥锁 "reset"

.net - Task.Wait 不等待

java - ExecutorService性能问题

java - 我如何确保给定的依赖项包含在我的 Spring 框架应用程序中?

delphi - 如何在 Delphi 代码中将 Speex Decompressor 连接到混音器(Mitov AudioLab 组件)

algorithm - Haskell 中的高效队列

Python 2 维 FIFO