假设以太网上有一些非 PC 设备,根据给定的 API,我可以使用 UDP 与它们进行通信。
我想使用线程池和线程安全队列进行 UDP 通信。每个工作线程都有自己的 indy TIdUDPclient 实例。该通信就像向设备发送一个 UDP 数据报然后等待应答。答案是数据段大于 2 个字节的 UDP 数据报。
设备由 TDevice 类实例表示。
TDevice = class(TObject)
private
...
FStatus: byte;
...
public
...
function GetStatus(): integer; //API commandID = 68 (Getting device Status)
end;
thPool 用于创建/管理线程并将作业推送到队列:
TthPool = class(TObject)
private
FQueue: TThreadedQueue<TrUDPdirectJob*>;
FthWorkers: TList<TthWorker>;
public
constructor Create( AthCount, AQueueDepth: integer); //creating the pool here
function SendCommand( ArSendJob: TrSendJob );
end;
function SendCommand(ArSendJob TrSendJob): integer;
begin
...
FQueue.PushItem( ArSendJob );
end;
TDevice 的一个功能是获取它所代表的硬件的状态,并根据收到的答案设置其 FStatus 的值:
function TDevice.GetStatus(): integer; //command byte: 68
const
PARAMSLENGTH = 4;
var
rSendJob: TrSendJob*
begin
rSendJob.IP := self.FIP;
rSendJob.port := self.Fport;
rSendJob.commandID := 68;
rSendJob.paramslength := PARAMLENGTH ; //need to send the length (API req)
SetLength( rSendJob.params, PARAMSLENGTH );
rSendJob.params[0] := $A; //just some parameters along the commandID
rSendJob.params[1] := $B;
rSendJob.params[2] := $C;
rSendJob.params[3] := $D;
...
thPool.SendCommand( rSendJob ); //pushing the job to the queue
end;
*TrSendJob = record //Job define
ip: string;
port: integer;
commandID: byte;
params: Tparams; //Array of byte
paramslength: byte;
end;
从工作线程发送UDP数据报:
procedure TthWorker.Execute;
var
sendBuffer: TIDbytes;
rBuffer: TIdBytes;
rSendJob: TrSendJob;
begin
inherited;
repeat
FQueue.PopItem(rSendJob); //Getting the job from the queue
//Building the sending buffer
sendBuffer[0] := rSendJob. ... ;
...
FIdUDPclient.SendBuffer( rSendJob.IP, rSendJob.port, sendBuffer );
if FIdUDPclient.receiveBuffer(rbuffer, RECTIMEOUT_GLOBAL) >= 1 then
begin
//
end;
until Terminated;
end;
总而言之,thPool的SendCommand将作业插入队列,工作线程从队列中拉出作业并发送出UDP数据报,然后它的在rbuffer中取回答案字节。但此时,如何将receivebuffer的内容发送回TDevice实例进行处理呢?执行此操作的正确(OOP)方法是什么?我需要在 TDevice 方法内进行处理。
如果我在 TrSendJob 定义一个 plus 方法指针字段,这样工作线程就知道要调用什么方法来处理缓冲区并设置 FState 值,这是一个正确的解决方案吗? TDevice?
TProcessReceiveBuffer = function(ArBuffer: TIdBytes): integer;
TrSendJob = record //Job define
ip: string;
port: integer;
commandID: byte;
params: Tparams; //Tparams array of byte
paramslength: byte;
*ProcessReceiveBuffer: TProcessReceiveBuffer; //pointer to a TDevice method thats processing the receivebuffer
end;
或者,整个概念都是错误的。
最佳答案
But at this point, how to send the contents of receivebuffer back to TDevice instance for processing?
您可以在 TrSendJob
中包含一个 TDevice
对象指针,然后 TthWorker
将知道要传递哪个 TDevice
缓冲到。
或者,您可以将缓冲区和 TEvent
放入 TrSendJob
中,然后让 TthWorker
填充缓冲区并发出事件信号,然后TDevice.GetStatus()
等待事件信号并处理缓冲区。
关于multithreading - 线程池、线程安全队列、OOP,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33434799/