javascript - RxJS 中的 bufferReduce

标签 javascript stream rxjs reactive-programming

你好,我想用 RxJS bufferReduce 并想知道这是否可能。 解决方案功能应将大文本拆分为字符总数不超过 100 个字符的单词。结果将如下所示:

[
  ['lorem ipsum whatever less than 100 chars'],
  ['bla bla blub less than 100 chars'],
]

我在纯 js 中的解决方案是:

const calcCharsInWordList = wordList => {
  return wordList.reduce((letterCount, lineWord) => letterCount + lineWord.length, 0) + wordList.length - 1 // add wordlist Length for the sparating spaces
};


const textToLines = (text, maxLineLength) => {
  return text
    .split(' ')
    .reduce((acc, word) => {
      acc.currentLine.push(word);
      if (calcCharsInWordList(acc.currentLine) > maxLineLength) {
        acc.allLines.push(acc.currentLine);
        acc.currentLine = [];
      }
      return acc;
    }, { currentLine: [], allLines: [] })
    .allLines
    .map(line => line.join(' '));
};


console.log(textToLines(`lorem ipsum ....... `, 100));

但如果可能的话,我想用 RxJS 来解决它:

Rx.Observable.from("abc defg hij ...").bufferReduce((acc, letter) => {
   acc = acc.push(letter)
   if(acc.length < 100){ 
     return true; // flush the buffer (I know it should return the acc ;) I just want you to get the idea.)
   }
   return false
})

那么也许嵌套的 Observable 可以帮助解决这个问题? BufferWhen 和 bufferCount 在这里真的没有帮助。 我很高兴能用 RxJS 解决它。

最佳答案

我在 SO 上多次看到类似的问题。基本问题是每次打开新缓冲区/窗口时调用用户函数的运算符(例如 bufferWhenwindowWhen)不会让您对通过的值使用react。如果他们这样做,事情就会变得容易得多。

此解决方案使用 scan 并且还包括可以比 LIMIT 常量短的最后一行。它仍然非常简单,并且不涉及任何 async 调度程序,这在使用 eg 时可能是必需的。 ConnectableObservable(共享源 Observable 并将其有条件地馈送到 buffer())。

const input = 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed non lacinia dui. Pellentesque ullamcorper sed enim et rhoncus. Vestibulum iaculis enim eget consectetur sagittis. Orci varius natoque penatibus et magnis dis parturient montes, nascetur mus.';
const LIMIT = 25;

Observable.of(input)
  .concatMap(str => str) // Emit each character separately
  .scan((line, char) => {
    if (line[line.length -1] === "\n") { // The previous line finished, start new line
      return char;
    } else if (line.length > LIMIT && char === ' ') { // Mark the line as finished with "\n"
      return line + "\n";
    }
    return line + char; // Nothing special, just append the char
  }, '')
  // This is required to include the last line that can be shorter than 100 chars.
  // The `subject` just passes everything through until the source completes.
  // Then it uses `concat` to append itself and replays the last value that is terminated with "\n"
  .multicast(new ReplaySubject(1),
    subject => subject.concat(subject.map(line => line + "\n"))
  )
  // Pass through only finished lines
  .filter(line => line[line.length -1] === "\n")
  // Remove the terminating character
  .map(line => line.trim())
  .subscribe(console.log);

查看现场演示:http://jsbin.com/fotiluk/13/edit?js,console

此代码打印以下输出:

"Lorem ipsum dolor sit amet,"
"consectetur adipiscing elit."
"Sed non lacinia dui. Pellentesque"
"ullamcorper sed enim et rhoncus."
"Vestibulum iaculis enim eget"
"consectetur sagittis. Orci"
"varius natoque penatibus et"
"magnis dis parturient montes,"
"nascetur mus."

我相信可能会有更短的解决方案,但我想始终从 scan 发出一个简单的字符串。如果我传递了一些对象,我可以更容易地检测到我何时要拆分行。

这是一个类似的问题:How to split a data frame from an arrayBuffer with RxJS?

关于javascript - RxJS 中的 bufferReduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48526452/

相关文章:

javascript - 使用Jquery选择器选择div的子元素

javascript - Nodejs交互控制台和文件执行运行时 'this'有什么区别?

node.js - Node 的超时功能?

c# - 为什么此代码中的流保持打开状态?

angular - Rxjs 6 : using catchError() gives You provided 'undefined' where a stream was expected. 您可以提供一个 Observable、Promise、Array 或 Iterable

javascript - 在JavaScript中构建一个数组来填充jsTree

javascript - 获取悬停输入类型范围的值

java - 如何(全局)替换Java并行流的公共(public)线程池后端?

angular - NGRX 效果 : Dispatch multiple actions individually

angular - 具有 Angular 6.0.0 和 rxjs 6.1.0 的 Observable<Object> 上不存在 map