c++ - RxCPP 的行为不同于 Rx.Net

标签 c++ rxcpp

我正在使用 RxCPP,但很难理解它的行为。

这里有两个程序,一个在 Rx.Net 中,另一个在 RxCPP 中。 他们应该输出相同的打印件,但实际上并没有。
该程序从鼠标流中获取点,并计算点之间的增量流。
鼠标是点流的流,每个笔划 - 从下往上按是一股流。鼠标一个接一个地给出这样的流。

在这些测试中,预期的输出是:
0 号增量为:0,0
Delta 1 是:5,0
Delta 2 为:0,5
三角洲 3 号是:2,3
这就是 Rx.Net 输出的内容。
Rx.Cpp 仅输出第一行:Delta no 0 is: 0,0

有什么想法吗?

Rx.Cpp 示例:

  #include <rx.hpp>
  namespace rx = rxcpp;
  namespace rxsub = rxcpp::subjects;
  using rxob = rx::observable<>;

    struct Point
    {
        Point(int x, int y) : x(x), y(y) {}

        int x = 0, y = 0;
        Point operator-() const { return {-x, -y}; }
        Point operator+(const Point& other) const { return Point{x + other.x, y + other.y}; }
        Point operator-(const Point& other) const { return operator+(-other); }
    };

    std::ostream& operator<<(std::ostream& o, const Point& p)
    {
        return o << "(" << p.x << ", " << p.y << ")";
    }

    void TestRxCPP()
    {
      using RxPoint = rx::observable<Point>;
      using Strokes = rx::observable<RxPoint>;
      using StrokesSubject = rxsub::subject<RxPoint>;

      StrokesSubject mouseSource;
      auto strokes = mouseSource.get_observable();

      auto deltaVectors = [](Strokes strokes) {
        auto deltas = strokes.flat_map([=](RxPoint stroke) {
            auto points = stroke;
            // create stream of delta vectors from start point
            auto firstPoint = points.take(1);
            auto delta =
                points.combine_latest([](Point v0, Point v1) { return v0 - v1; }, firstPoint);
            return delta;
        });

        return deltas;
      };

      auto delta = deltaVectors(strokes);
      int n = 0;
      delta.subscribe(
        [&](const Point& d) { std::cout << "Delta no. " << n++ << " is: " << d << std::endl; });

      auto testMouse = rxob::from(Point{3 + 0, 4 + 0}, Point{3 + 5, 4 + 0}, Point{3 + 0, 4 + 5}, Point{3 + 2, 4 + 3});
      mouseSource.get_subscriber().on_next(testMouse);
    }

Rx.Net 示例:

    void RxNET()
    {
        var strokesS = new Subject<IObservable<Point>>();

        Func<IObservable<IObservable<Point>>, IObservable<Point>> 
        deltaVectors = strokes =>
        {
            var deltas = strokes.SelectMany(stroke =>
            {
                var points = stroke;
                // create stream of delta vectors from start point
                var firstPoint = points.Take(1);
                var deltaP =
                    points.CombineLatest(firstPoint, (v0, v1) => new Point(v0.X - v1.X, v0.Y - v1.Y));
                return deltaP;
            });

            return deltas;
        };

        var delta = deltaVectors(strokesS);
        var n = 0;
        delta.Subscribe(d => { Console.WriteLine($"Delta no {n++} is: {d}\n"); });

        var testMouse = new List<Point>
        {
            new Point(3 + 0, 4 + 0),
            new Point(3 + 5, 4 + 0),
            new Point(3 + 0, 4 + 5),
            new Point(3 + 2, 4 + 3)
        }.ToObservable();
        strokesS.OnNext(testMouse);
    }

最佳答案

Thanks to @Kirk Shoop at the rxcpp github :-)
这是 HOTvCOLD 行为。

这些笔画是冷的并且正在共享并且只使用一个线程。 points.combine_latest(..., firstPoint) 意味着在订阅 firstPoint 之前发送所有点。因此只有最后一个增量被发出。

如果您反转 combine_latest

,冷源和热源将起作用
auto delta =
    firstPoint.combine_latest([](Point v0, Point v1) { return v1 - v0; }, points);

关于c++ - RxCPP 的行为不同于 Rx.Net,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45662139/

相关文章:

c++ - 将 2d 屏幕位置投影到 3d 世界空间

c++ - rxcpp:嵌套的 while 循环或类似的 "classic"程序命令结构

c++ - RxCpp:如果使用 observe_on(rxcpp::observe_on_new_thread()),观察者的生命周期

c++ - RXCPP:做一个不关心可观察对象输入类型的扩展

c++ - 使用Nsight调试导出DLL的非启动项目中的CUDA代码

c++ - C++中的BIOS信息

c++ - 为什么我新分配的指针会在程序退出时自动删除?

c++ - opencv 与 netbeans C++ 链接器错误

c++ - 传递给 lambda 函数的空指针不再为空

c++ - 如何在可观察对象列表上使用RxCpp运算符?