javascript - 如何拆分和合并 RxJS 发出的数组值?

标签 javascript angular rxjs binary observable

我有一个 RxJS Observable,它发出 Uint8Array 值类型的二进制数据。但并不是每个发出的值都包含一个完整的可以自行处理的数据对象。

完整数据对象的数据格式由一个起始字节(0xAA)、中间的一些可变长度数据和一个结束字节(0xFF)组成。中间的数据是 BCD 编码的,这意味着它主要不包含开始或结束字节,而只包含从 0x000x99 的二进制值。

这是一个例子:

// This is a mock of the source observable which emits values:
const source$ = from([
  // Case 1: One complete data object with start (0xAA) and end byte (0xFF)
  new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),

  // Case 2: Two complete data objects in a single value emit
  new Uint8Array([0xAA, 0x12, 0x76, 0xFF, 0xAA, 0x83, 0x43, 0xFF,]),

  // Case 3: Two uncomplete value emits which form a single data object
  new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
  new Uint8Array([0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),

  // Case 4: A combination of Cases 2 and 3
  new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
  new Uint8Array([0x55, 0x81, 0xFF, 0xAA, 0x73, 0x96]),
  new Uint8Array([0x72, 0x23, 0x11, 0x95, 0xFF]),
])

source$.subscribe((x) => {
  console.log('Emitted value as Hexdump:')
  console.log(hexdump(x.buffer))
})

Hexdump logs

目标是只接收完整的数据对象。也许作为一个转换后的新 Observable?

上面的例子应该是这样的:

const transformedSource$ = from([
  // Case 1
  new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),

  // Case 2
  new Uint8Array([0xAA, 0x12, 0x76, 0xFF,]),
  new Uint8Array([0xAA, 0x83, 0x43, 0xFF,]),

  // Case 3
  new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),

  // Case 4
  new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x55, 0x81, 0xFF]),
  new Uint8Array([0xAA, 0x73, 0x96, 0x72, 0x23, 0x11, 0x95, 0xFF]),
])
  1. 哪些 RxJS 方法或运算符适用于此?
  2. 我想首先在 0xFF 处进行拆分,然后再进行合并。这个怎么做?非常感谢具有 RxJS 经验的人提出的想法。

最佳答案

您可以尝试拆分 block 以便一个一个地处理字节,将它们添加到缓冲区直到 0xff 字节出现在流中并返回所有缓冲的元素并重置缓冲区对于下一个 block :

let buffer = new Uint8Array();

transformedSource$.pipe(
        mergeAll(), // this splits your array and emits the single bytes into the stream
        mergeMap((next) => {
            buffer = new Uint8Array([...buffer, next]);
            if (next === 0xff) { (
                const result = buffer;
                buffer = new Uint8Array(); // resets the buffer
                return of(result); // will emit a completed chunk
            }
            return EMPTY; // won't emit anything 
        })
    )
    .subscribe(console.log);

如果您不喜欢全局变量,或者您还想为其他流重用该代码,这里有一个带有自定义运算符的替代解决方案:

const mergeChunks = () => {
    let buffer = new Uint8Array();

    return (source$) =>
        source$.pipe(
            mergeMap((next) => {
                buffer = new Uint8Array([...buffer, next]);
                if (next === 0xff) {
                    const result = buffer;
                    buffer = new Uint8Array();
                    return of(result);
                }
                return EMPTY;
            })
        );
} 

transformedSource$.pipe(
        mergeAll(),
        mergeChunks(),
    )
    .subscribe(console.log);

关于javascript - 如何拆分和合并 RxJS 发出的数组值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66796931/

相关文章:

javascript - electron-packager spawn ENOENT

angular - 在 Angular2 中传递服务作为输入

angular - 如何检查 RxJS Observable 是否包含 Angular2 中的字符串?

angular - Angular 6 中的 debounceTime 和 distinctUntilChanged

javascript - 用javascript替换多个字符

javascript - 选择所有以下划线 (_) 开头的对象键

angular - 如何在 Angular 7 中设置超时 KeyUp

angular - 当返回语句在订阅方法中时出现 Ts 错误 : A function whose declared type is neither 'void' nor 'any' must return a value.

javascript - switchMap : Multiple (2) requests getting triggered

javascript - 阴影和圆圈数 Carousel Bootstrap 4