multithreading - TParallel.为了性能

标签 multithreading delphi parallel-processing delphi-xe7

给出以下在一维数组中查找奇数的简单任务:

begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  for i := 0 to MaxArr-1 do
      if ArrXY[i] mod 2 = 0 then
        Inc(odds);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

看起来这将是并行处理的一个很好的候选者。因此,人们可能会想使用以下 TParallel.For 版本:

begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 = 0 then
      inc(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

这种并行计算的结果在两个方面有点令人惊讶:

  1. 计算的赔率错误

  2. 执行时间比串行版本长

1) 是可以解释的,因为我们没有保护并发访问的赔率变量。因此,为了解决这个问题,我们应该使用 TInterlocked.Increment(odds); 来代替。

2) 也是可以解释的:它表现出 false sharing 的效果.

理想情况下,错误共享问题的解决方案是使用局部变量来存储中间结果,并且仅在所有并行任务结束时对这些中间结果进行求和。 这是我真正无法理解的问题:有没有办法将局部变量放入我的匿名方法中?请注意,仅在匿名方法主体中声明局部变量是行不通的,因为每次迭代都会调用匿名方法主体。如果这是可行的,是否有办法在每次任务迭代结束时从匿名方法中获取中间结果?

编辑:实际上我对计算赔率或埃文斯不太感兴趣。我只是用这个来演示效果。

出于完整性原因,这里有一个控制台应用程序演示了效果:

program Project4;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils, System.Threading, System.Classes, System.SyncObjs;

const
  MaxArr = 100000000;

var
  Ticks: Cardinal;
  i: Integer;
  odds: Integer;
  ArrXY: array of Integer;

procedure FillArray;
var
  i: Integer;
  j: Integer;
begin
  SetLength(ArrXY, MaxArr);
  for i := 0 to MaxArr-1 do
      ArrXY[i]:=Random(MaxInt);
end;

procedure Parallel;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 = 0 then
      TInterlocked.Increment(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

procedure ParallelFalseResult;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(0,  MaxArr-1, procedure(I:Integer)
  begin
    if ArrXY[i] mod 2 = 0 then
      inc(odds);
  end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel - false odds: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

procedure Serial;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  for i := 0 to MaxArr-1 do
      if ArrXY[i] mod 2 = 0 then
        Inc(odds);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Serial: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

begin
  try
    FillArray;
    Serial;
    ParallelFalseResult;
    Parallel;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  Readln;
end.

最佳答案

这个问题的关键是正确的分区和尽可能少的共享。

使用此代码,它的运行速度几乎比串行代码快 4 倍。

const 
  WorkerCount = 4;

function GetWorker(index: Integer; const oddsArr: TArray<Integer>): TProc;
var
  min, max: Integer;
begin
  min := MaxArr div WorkerCount * index;
  if index + 1 < WorkerCount then
    max := MaxArr div WorkerCount * (index + 1) - 1
  else
    max := MaxArr - 1;
  Result :=
    procedure
    var
      i: Integer;
      odds: Integer;
    begin
      odds := 0;
      for i := min to max do
        if Odd(ArrXY[i]) then
          Inc(odds);
      oddsArr[index] := odds;
    end;
end;

procedure Parallel;
var
  i: Integer;
  oddsArr: TArray<Integer>;
  workers: TArray<ITask>;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  SetLength(oddsArr, WorkerCount);
  SetLength(workers, WorkerCount);

  for i := 0 to WorkerCount-1 do
    workers[i] := TTask.Run(GetWorker(i, oddsArr));
  TTask.WaitForAll(workers);

  for i := 0 to WorkerCount-1 do
    Inc(odds, oddsArr[i]);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('Parallel: ' + Ticks.ToString + 'ms, odds: ' + odds.ToString);
end;

您可以使用 TParallel.For 编写类似的代码,但它的运行速度仍然比仅使用 TTask 慢一些(例如比串行快 3 倍)。

顺便说一句,我使用该函数返回 worker TProc 以获得正确的索引捕获。如果您在同一例程的循环中运行它,您将捕获循环变量。

2014 年 12 月 19 日更新:

由于我们发现关键是正确分区,因此可以非常轻松地将其放入并行 for 循环中,而无需将其锁定在特定数据结构上:

procedure ParallelFor(lowInclusive, highInclusive: Integer;
  const iteratorRangeEvent: TProc<Integer, Integer>);

  procedure CalcPartBounds(low, high, count, index: Integer;
    out min, max: Integer);
  var
    len: Integer;
  begin
    len := high - low + 1;
    min := (len div count) * index;
    if index + 1 < count then
      max := len div count * (index + 1) - 1
    else
      max := len - 1;
  end;

  function GetWorker(const iteratorRangeEvent: TProc<Integer, Integer>;
    min, max: Integer): ITask;
  begin
    Result := TTask.Run(
      procedure
      begin
        iteratorRangeEvent(min, max);
      end)
  end;

var
  workerCount: Integer;
  workers: TArray<ITask>;
  i, min, max: Integer;
begin
  workerCount := TThread.ProcessorCount;
  SetLength(workers, workerCount);
  for i := 0 to workerCount - 1 do
  begin
    CalcPartBounds(lowInclusive, highInclusive, workerCount, i, min, max);
    workers[i] := GetWorker(iteratorRangeEvent, min, max);
  end;
  TTask.WaitForAll(workers);
end;

procedure Parallel4;
begin
  odds := 0;
  Ticks := TThread.GetTickCount;
  ParallelFor(0, MaxArr-1,
    procedure(min, max: Integer)
    var
      i, n: Integer;
    begin
      n := 0;
      for i := min to max do
        if Odd(ArrXY[i]) then
          Inc(n);
      AtomicIncrement(odds, n);
    end);
  Ticks := TThread.GetTickCount - Ticks;
  writeln('ParallelEx: Stefan Glienke ' + Ticks.ToString + ' ms, odds: ' + odds.ToString);
end;

关键是使用局部变量进行计数,并且仅在最后使用共享变量一次来添加小计。

关于multithreading - TParallel.为了性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27535045/

相关文章:

asp.net-mvc - 如何处理 ASP.NET Web API 中的 OperationCanceledException?

multithreading - 通过分离#omp parallel 和#omp for,减少OpenMP fork/join 开销

parallel-processing - 提早退出线程是否会破坏 block 中CUDA线程之间的同步?

c# - SynchronizationContext.CreateCopy 的目的

delphi - 双舍入

java - 每 x 秒执行一次代码

delphi - 创建指向类的新指针

delphi - 具有匿名方法的 VCL 事件 - 您对此实现有何看法?

java - 多线程允许不同id的多个线程进入临界区

c# - 如何处理多线程中的竞争条件?