node.js - 如何链接以前在 forkjoin() 操作中的一组可观察量

标签 node.js express asynchronous rxjs

我在mysql中有一个订单表,每个订单都有许多与之关联的文档,无论它们是报价单、发票等。因此有第二个表称为“文档”,它有一个“document_id”主键和一个“order_id”外键;以类似的方式,我有另一个案例,用于技术人员对每辆车进行的不同检查,然后是另一个用于车辆图片的表格。我正在使用 Node 和 Express 创建一个 Web 服务,需要返回类似于此的 json...

[
  {
    "order_id": 1003,
    "customer_id": 8000,
    "csi": 90,
    "date_admitted": "2016-10-28T05:00:00.000Z",
    "plates": "YZG-5125",
    ...
    documents: {
       "type": "invoice",
       "number": "1234",
       ...
    },
    checks: {
       "scanner": "good",
       "battery": "average",
       ...
    },
    vehicle_pictures: {
       "title": "a title...",
       "path": "the file path"
       ...
    }
  },
  {
    ...
  },
  ...
]

如您所见,需要对每个订单执行三次查询,一次查询支票,另一次查询文档,第三次查询图片,然后我需要将这些子结果添加到订单中,最终返回数组中的数组回应。

在同步编程的旧世界中,这将是一项非常容易完成的任务,但是由于 mysql 库的连接对象中的 query() 方法的异步性质,这有可能成为真正的 hell 。

在我必须处理单个订单的情况下,在服务器上使用 RxJS 库和 forkJoin() 足以一次处理所有三个结果,我不确定如何“链接”每个订单(使用 forkJoin 来管理 3 个查询),因此一切都会得到处理,最后我可以调用 res.json(result) 来将所有内容整齐地组装起来。

注意:我想用 RxJS 来解决这个问题,而不是使用像 node-mysql-libmysqlclient 这样的同步库包。原因基本上是,在像 Node JS 这样的异步语言中执行此操作的“正确”方法是异步。另外,我想使用 RxJS 而不是 async、q Promise 或任何其他库,因为 Observables 似乎是异步解决方案竞赛中的绝对赢家,并且也希望在我开发的所有解决方案中保持一致,所以这个问题主要面向RxJS 大师。

此外,我发现的与此类似的每个问题都有经典的“纯粹主义者”答复,说如果您使用 Node,您“应该”使用异步,而不是考虑同步解决方案。所以这对于那些捍卫这一立场的人来说是一个挑战,因为(我认为)这是 Node 中同步有意义的情况之一,但是我真的想学习如何使用 RxJS 来做到这一点,而不是认为这是不可能的,我确信不是。

最佳答案

如果我理解正确的话,您有一些数据想要用来通过异步操作从数据库收集其他数据。您想要构建一个由原始数据和后续查询返回的附加信息组成的组合数据集。

正如您所提到的,您可以使用 forkJoin 等待多个操作完成,然后再继续。您必须对数据序列中的每个项目执行此操作,然后使用 switchMap 将结果合并回原始流。

看看 following example jsbin演示了如何做到这一点:

const data = [
  { id: 1, init: 'a' },
  { id: 2, init: 'b' },
  { id: 3, init: 'c' }
]

function getA(id) {
  return Rx.Observable.timer(1000)
    .map(() => {
      return { id, a: 'abc' }
    })
    .toPromise();
}

function getB(id) {
  return Rx.Observable.timer(1500)
    .map(() => {
      return { id, b: 'def' }
    })
    .toPromise();
}

Rx.Observable.interval(5000)
  .take(data.length)
  .map(id => data[id])
  .do(data => { console.log(`query id ${data.id}`)})
  .switchMap((data) => {
    return Rx.Observable.forkJoin(getA(data.id), getB(data.id), (a, b) => {
      console.log(`got results for id ${data.id}`);
      return Object.assign({}, data, a, b);
    });
  })
  .subscribe(x => console.log(x));

关于node.js - 如何链接以前在 forkjoin() 操作中的一组可观察量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40937118/

相关文章:

mysql - 如何在sequelize中使用mysql的curdate()

mysql - babel 为何在 Sequelize 中不起作用

java - 如何在F.Promise.get(timeout)中检测和处理超时

c# - 使用 WinForms ProgressBar 异步/等待

node.js - Express JS - 想要缓存静态资源但不渲染 HTML

asynchronous - Blazor InvokeAsync 与 await InvokeAsync

object - 用户模块-node js

javascript - Firebase Cloud Functions 有时直到函数结束才执行

javascript - Node.Js - Jade 表单不发布或不执行任何操作?

node.js - 使用 Nodemon 每次保存时都会生成 EADDRINUSE