javascript - 如何使用 highland.js fork 流?

标签 javascript typescript stream highland.js

我有一个由 BaseData 对象组成的 sourceStream

我想将此流 fork 为 n 数量的不同流,然后根据自己的喜好过滤和转换每个 BaseData 对象。

最后,我希望有 n 个流仅包含特定类型,并且 fork 流的长度可以有所不同,因为将来可能会删除或添加数据。

我想我可以通过fork来设置它:

import * as _ from 'highland';

interface BaseData {
    id: string;
    data: string;
}

const sourceStream = _([
    {id: 'foo', data: 'poit'},
    {id: 'foo', data: 'fnord'},
    {id: 'bar', data: 'narf'}]);

const partners = [
    'foo',
    'bar',
];

partners.forEach((partner: string) => {
    const partnerStream = sourceStream.fork();

    partnerStream.filter((baseData: BaseData) => {
        return baseData.id === partner;
    });

    partnerStream.each(console.log);
});

我希望现在有两个流,并且 foo-stream 包含两个元素:

{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }

bar-stream 包含一个元素:

{ id: 'bar', data: 'narf' }

但是我收到了一个错误:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
        throw new Error(
        ^

Error: Stream already being consumed, you must either fork() or observe()
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
    at Array.forEach (native)
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)

如何将一个流 fork 为多个流?

<小时/>

我也尝试过链接调用,但随后我只得到一个流的结果:

partners.forEach((partner: string) => {
    console.log(partner);
    const partnerStream = sourceStream
        .fork()
        .filter((item: BaseData) => {
            return item.id === partner;
        });

    partnerStream.each((item: BaseData) => {
        console.log(item);
    });
});

仅打印:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar

而不是预期的:

foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
<小时/>

也可能是我误解了 fork 的全部意义。根据its doc entry :

Stream.fork() Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from its source as fast as the slowest consumer can handle them.

NOTE: Do not depend on a consistent execution order between the forks. This transform only guarantees that all forks will process a value foo before any will process a second value bar. It does not guarantee the order in which the forks process foo.

TIP: Be careful about modifying stream values within the forks (or using a library that does so). Since the same value will be passed to every fork, changes made in one fork will be visible in any fork that executes after it. Add to that the inconsistent execution order, and you can end up with subtle data corruption bugs. If you need to modify any values, you should make a copy and modify the copy instead.

Deprecation warning: It is currently possible to fork a stream after consuming it (e.g., via a transform). This will no longer be possible in the next major release. If you are going to fork a stream, always call fork on it.

因此,而不是“如何 fork 流?”我的实际问题可能是:如何将高地溪流即时复制到不同的溪流中?

最佳答案

partnerStream.filter() 返回一个流。然后,您可以使用 partnerStream.each() 再次使用 partnerStream,而不调用 fork()observe() >。因此,要么链接 partnerStream.filter().each() 调用,要么将 partnerStream.filter() 的返回值分配给变量并调用 .each( ) 对此。

关于javascript - 如何使用 highland.js fork 流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42073925/

相关文章:

json - Node.js - 我可以在 Redis 中将可写流存储为 JSON 吗?

javascript - 使用 Javascript 保存跟踪的 GPS 数据

javascript - 对于奇数测试,按位 AND 运算符总是比模数快(仍然)吗?

php - 如何使用 Typescript 发出 AJAX 请求? (使用 JSON)

javascript - Angular 4 : How to watch an object for changes?

javascript - Typescript 知道 import -> require 但不知道 require -> import?

C语言复制文件函数

javascript - 事件源 XHR header

javascript - 如果从 $http.get() 返回的数据不是 JSON 格式,它是普通文本,如何处理?

javascript - 启动画面后 iPhone ChildBrowser 重定向