node.js - Highland.js 中的嵌套流操作

标签 node.js stream highland.js

我有来自 readdirp 模块的目录流。

我想要:-

  • 在每个目录中使用正则表达式(例如 README.*)搜索文件
  • 读取该文件中不以 # 开头的第一行
  • 打印出每个目录以及该目录中自述文件的第一个非标题行。

我正在尝试使用流和highland.js来做到这一点.

我在尝试处理每个目录中所有文件的流时遇到困难。

h = require 'highland'

dirStream = readdirp root: root, depth: 0, entryType: 'directories'

dirStream = h(dirStream)
  .filter (entry) -> entry.stat.isDirectory()
  .map (entry) ->

    # Search all files in the directory for README.
    fileStream = readdirp root: entry.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store'
    fileStream = h(fileStream).filter (entry) -> /README\..*/.test entry.name
    fileStream.each (file) ->
      readmeStream = fs.createReadStream file
      _(readmeStream)
        .split()
        .takeUntil (line) -> not line.startsWith '#' and line isnt ''
        .last(1)
        .toArray (comment) ->
          # TODO: How do I access `comment` asynchronously to include in the return value of the map?

    return {name: entry.name, comment: comment}

最佳答案

最好将高地流视为不可变的,并且像filter这样的操作和map返回依赖于旧流的新流,而不是对旧流的修改。

此外,Highland 方法很懒:您应该只调用 eachtoArray当您现在绝对需要数据时。

异步映射流的标准方法是 flatMap 。就像map ,但是你给它的函数应该返回一个流。您从 flatMap 获得的流是所有返回流的串联。由于新流依序依赖于所有旧流,因此可用于对异步过程进行排序。

我会将您的示例修改为以下内容(澄清了一些变量名称):

h = require 'highland'

readmeStream = h(readdirp root: root, depth: 0, entryType: 'directories')
  .filter (dir) -> dir.stat.isDirectory()
  .flatMap (dir) ->
    # Search all files in the directory for README.
    h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store')
    .filter (file) -> /README\..*/.test file.name
    .flatMap (file) ->
      h(fs.createReadStream file.name)
        .split()
        .takeUntil (line) -> not line.startsWith '#' and line isnt ''
        .last(1)
        .map (comment) -> {name: file.name, comment}

让我们看一下这段代码中的类型。首先,请注意flatMap具有类型(以 Haskellish 表示法)Stream a → (a → Stream b) → Stream b ,即它需要一个包含 a 类型的内容的流,以及一个需要 a 类型的函数并返回包含 b 的流s,并返回包含 b 的流s。集合类型(例如流和数组)实现 flatMap 的标准作为连接返回的集合。

h(readdirp root: root, depth: 0, entryType: 'directories')

假设它的类型为 Stream Directoryfilter不会更改类型,因此 flatMap将是Stream Directory → (Directory → Stream b) → Stream b 。我们将看看该函数返回什么:

h(readdirp root: dir.fullPath, depth: 0, entryType: 'files', fileFilter: '!.DS_Store')

将此称为 Stream File ,所以第二个flatMapStream File → (File → Stream b) → Stream b .

h(fs.createReadStream file.name)

这是一个Stream Stringsplit , takeUntillast不要改变它,那么 map 会做什么?做? mapflatMap 非常相似:其类型为Stream a → (a → b) → Stream b 。在这种情况下aStringb是一个对象类型 {name : String, comment : String} 。然后map返回该对象的流,这就是总体 flatMap函数返回。站起来,b在第二个flatMap是对象,所以第一个 flatMap的函数还返回对象的流,因此整个流是 Stream {name : String, comment : String} .

请注意,由于 Highland 的惰性,这实际上不会启动任何流或处理。您需要使用eachtoArray导致thunk并启动管道。在 each ,将使用您的对象调用回调。根据您想要对评论执行的操作,最好是 flatMap更多(例如,如果您将它们写入文件)。

呃,我不是故意写文章的。希望这会有所帮助。

关于node.js - Highland.js 中的嵌套流操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27721268/

相关文章:

javascript - 如何通过高地从远程 JSON 数组创建对象流?

java - exec-maven-plugin 说不能运行指定的程序,即使它在 PATH 上

node.js - Windows Azure Node.js 内部服务器错误

node.js - lerna 导入总是返回 EDESTDIR

javascript - 将多个文件传输到一个响应

ios - 如何以字典流形式获取输出 - 套接字

c++ - 使用 Qt 录制网络广播流

javascript - 如何使用 oboe 迭代对象数组?

javascript - 从大文件流式传输并创建数组

node.js - 在 Mongoose 中调用 modal.countDocuments() 和 modal.count() 时抛出错误