c++ - 从编译时依赖图 (DAG) 构建异步 `future` 回调链

标签 c++ multithreading algorithm asynchronous future

我有一个编译时 directed acyclic graph异步任务。 DAG 显示了任务之间的依赖关系:通过分析它,可以了解哪些任务可以并行运行(在单独的线程中)以及哪些任务需要等待其他任务完成才能开始(依赖关系)。

我想从 DAG 生成一个回调链,使用 boost::future.then(...) , when_all(...)延续辅助函数。这一生成的结果将是一个函数,当调用该函数时,将启动回调链并执行 DAG 所描述的任务,并行运行尽可能多的任务。

但是,我很难找到适用于所有情况的通用算法。

我画了几张图,让问题更容易理解。这是一个图例,将向您展示图中符号的含义:

Legend: how to read the images.

让我们从一个简单的线性 DAG 开始:

Example 0: linear DAG.

这个依赖图由三个任务组成( ABC )。 C取决于 B . B取决于 A .这里没有并行的可能性 - 生成算法将构建类似于以下内容的内容:

boost::future<void> A, B, C, end;

A.then([]
    {
        B.then([]
            {
                C.get();
                end.get();
            });
    });

(请注意,所有代码示例都不是 100% 有效 - 我忽略了移动语义、转发和 lambda 捕获。)

有很多方法可以解决这个线性 DAG:无论是从结尾还是从头开始,构建正确的回调链都是微不足道的。

forks and joins 时事情开始变得更加复杂介绍。

这是一个带有 fork/join 的 DAG:

Example 1: DAG with a fork/join.

很难想到匹配这个 DAG 的回调链。如果我尝试倒退,从最后开始,我的推理如下:
  • end取决于 BD . (加入)
  • D取决于 C .
  • BC依赖 A . (叉)

  • 一个可能的链看起来像这样:
    boost::future<void> A, B, C, D, end;
    
    A.then([]
        {
            boost::when_all(B, C.then([]
                                   {
                                       D.get();
                                   }))
                .then([]
                    {
                        end.get();
                    });
        });
    

    我发现手工写这个链很困难,我也怀疑它的正确性。我想不出一个通用的方法来实现一个可以产生这个的算法 - 由于 when_all 的事实,还存在额外的困难。需要将其参数移入其中。

    让我们看看最后一个更复杂的例子:

    Example 2: complex DAG.

    在这里,我们希望尽可能地利用并行性。考虑任务 E :E可以与 [B, C, D] 中的任何一个并行运行.

    这是一个可能的回调链:
    boost::future<void> A, B, C, D, E, F, end;
    
    A.then([]
        {
            boost::when_all(boost::when_all(B, C).then([]
                                {
                                    D.get();
                                }),
                E)
                .then([]
                    {
                        F.then([]
                            {
                                end.get();
                            });
                    });
        });
    

    我试图以多种方式提出通用算法:
  • 从 DAG 开始,尝试使用 .then(...) 建立链延续。这不适用于连接,因为目标连接任务会重复多次。
  • 从 DAG 的末端开始,尝试使用 when_all(...) 生成链延续。这对于 fork 失败了,因为创建 fork 的节点会重复多次。

  • 显然,“广度优先遍历”方法在这里效果不佳。从我手写的代码示例来看,算法需要注意fork和join,需要能够正确混合.then(...)when_all(...)延续。

    这是我最后的问题:
  • 是否总是可以生成 future基于任务依赖关系的 DAG 的回调链,其中每个任务在回调链中只出现一次?
  • 如果是这样,在给定任务依赖 DAG 构建回调链的情况下,如何实现通用算法?


  • 编辑 1:

    Here's an additional approach我正在尝试探索。

    这个想法是生成一个 ([dependencies...] -> [dependents...])从 DAG 映射数据结构,并从该映射生成回调链。

    len(dependencies...) > 1 ,然后 value是一个连接节点。

    len(dependents...) > 1 ,然后 key是一个 fork 节点。

    映射中的所有键值对都可以表示为when_all(keys...).then(values...)延续。

    困难的部分是找出“扩展”(考虑类似于解析器的东西)节点的正确顺序以及如何将 fork/join 延续连接在一起。

    考虑以下由图像 4 生成的 map .
    depenendencies  |  dependents
    ----------------|-------------
    [F]             :  [end]
    [D, E]          :  [F]
    [B, C]          :  [D]
    [A]             :  [E, C, B]
    [begin]         :  [A]
    

    通过应用某种类似解析器的缩减/传递,我们可以获得一个“干净”的回调链:
    // First pass:
    // Convert everything to `when_all(...).then(...)` notation
    when_all(F).then(end)
    when_all(D, E).then(F)
    when_all(B, C).then(D)
    when_all(A).then(E, C, B)
    when_all(begin).then(A)
    
    // Second pass:
    // Solve linear (trivial) transformations
    when_all(D, E).then(
        when_all(F).then(end)
    )
    when_all(B, C).then(D)
    when_all(
        when_all(begin).then(A)
    ).then(E, C, B)
    
    // Third pass:
    // Solve fork/join transformations
    when_all(
        when_all(begin).then(A)
    ).then(
        when_all(
            E, 
            when_all(B, C).then(D)
        ).then(
            when_all(F).then(end)
        )   
    )
    

    第三遍是最重要的一遍,但也是看起来很难设计算法的一遍。

    注意如何[B, C]必须在 [E, C, B] 内找到列表,以及如何,在 [D, E] 中依赖列表,D必须解释为 when_all(B, C).then(D) 的结果并与 E 链接在一起在 when_all(E, when_all(B, C).then(D)) .

    也许整个问题可以简化为:

    给定一个由 [dependencies...] -> [dependents...] 组成的 map 键值对,如何将这些对转换为 when_all(...) 的算法/.then(...)延续链能实现吗?

    编辑2:

    这是一些 pseudocode我想出了上面描述的方法。它似乎适用于我尝试过的 DAG,但我需要花更多的时间在它上面并用其他更棘手的 DAG 配置“精神上”测试它。

    最佳答案

    如果可能出现冗余依赖项,请先删除它们(参见例如 https://mathematica.stackexchange.com/questions/33638/remove-redundant-dependencies-from-a-directed-acyclic-graph )。

    然后执行以下图形转换(在合并节点中构建子表达式),直到您得到单个节点(以类似于计算电阻网络的方式):

    Graph transformations
    * :额外的传入或传出依赖项,取决于位置
    (...) : 单个节点中的表达式

    Java 代码,包括更复杂示例的设置:

    public class DirectedGraph {
      /** Set of all nodes in the graph */
      static Set<Node> allNodes = new LinkedHashSet<>();
    
      static class Node {
        /** Set of all preceeding nodes */
        Set<Node> prev = new LinkedHashSet<>();
    
        /** Set of all following nodes */
        Set<Node> next = new LinkedHashSet<>();
    
        String value;
    
        Node(String value) {
          this.value = value;
          allNodes.add(this);
        }
    
        void addPrev(Node other) {
          prev.add(other);
          other.next.add(this);
        }
    
        /** Returns one of the next nodes */
        Node anyNext() {
          return next.iterator().next();
        }
    
        /** Merges this node with other, then removes other */
        void merge(Node other) {
          prev.addAll(other.prev);
          next.addAll(other.next);
          for (Node on: other.next) {
            on.prev.remove(other);
            on.prev.add(this);
          }
          for (Node op: other.prev) {
            op.next.remove(other);
            op.next.add(this);
          }
          prev.remove(this);
          next.remove(this);
          allNodes.remove(other);
        }
    
        public String toString() {
          return value;
        }
      }
    
      /** 
       * Merges sequential or parallel nodes following the given node.
       * Returns true if any node was merged.
       */
      public static boolean processNode(Node node) {
        // Check if we are the start of a sequence. Merge if so.
        if (node.next.size() == 1 && node.anyNext().prev.size() == 1) {
          Node then = node.anyNext();
          node.value += " then " + then.value;
          node.merge(then);
          return true;
        }
    
        // See if any of the next nodes has a parallel node with
        // the same one level indirect target. 
        for (Node next : node.next) {
    
          // Nodes must have only one in and out connection to be merged.
          if (next.prev.size() == 1 && next.next.size() == 1) {
    
            // Collect all parallel nodes with only one in and out connection 
            // and the same target; the same source is implied by iterating over 
            // node.next again.
            Node target = next.anyNext().next();
            Set<Node> parallel = new LinkedHashSet<Node>();
            for (Node other: node.next) {
              if (other != next && other.prev.size() == 1
                 && other.next.size() == 1 && other.anyNext() == target) {
                parallel.add(other);
              }
            }
    
            // If we have found any "parallel" nodes, merge them
            if (parallel.size() > 0) {
              StringBuilder sb = new StringBuilder("allNodes(");
              sb.append(next.value);
              for (Node other: parallel) {
                sb.append(", ").append(other.value);
                next.merge(other);
              }
              sb.append(")");
              next.value = sb.toString();
              return true;
            }
          }
        }
        return false;
      }
    
      public static void main(String[] args) {
        Node a = new Node("A");
        Node b = new Node("B");
        Node c = new Node("C");
        Node d = new Node("D");
        Node e = new Node("E");
        Node f = new Node("F");
    
        f.addPrev(d);
        f.addPrev(e);
    
        e.addPrev(a);
    
        d.addPrev(b);
        d.addPrev(c);
    
        b.addPrev(a);
        c.addPrev(a);
    
        boolean anyChange;
        do {
          anyChange = false;
          for (Node node: allNodes) {
            if (processNode(node)) {
              anyChange = true;
              // We need to leave the inner loop here because changes
              // invalidate the for iteration. 
              break;
            }
          }
          // We are done if we can't find any node to merge.
        } while (anyChange);
    
        System.out.println(allNodes.toString());
      }
    }
    

    输出:A then all(E, all(B, C) then D) then F

    关于c++ - 从编译时依赖图 (DAG) 构建异步 `future` 回调链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35778864/

    相关文章:

    c++ - C/C++ 工作流引擎

    c++ - 为什么 memory_order_relaxed 在 x86 上使用原子(锁前缀)指令?

    c++ - Qt4.8 : Why enum not seen in qmetaobject? 以及如何工作?

    c++ - 如何访问 C++ Windows 10 应用程序中的默认线程池?

    cocoa - Cocoa 中等效的 C# 'Thread.Join()' 是什么?

    c++ - C++语句 'delete [] Q;'的Big-O是O(1)还是O(n)?

    java - 如何为每个请求项创建多个线程

    algorithm - 如何组合不同的二维码以获得新的二维码

    algorithm - 最小化段落中的总空白空间 - 算法

    c# - 将较小的 n 维数组放入较大数组的最快方法(二维图形类比 : paint rectangle on canvas)