concurrency - erlang流程和消息传递体系结构

标签 concurrency process erlang messages

我手头的任务是读取大文件的行,对其进行处理并返回有序的结果。

我的算法是:

  • 从主进程开始,它将评估工作负载(写在文件的第一行)
  • 生成工作进程:每个工作进程将使用pread / 3读取文件的一部分,进行处理,然后将结果发送给主
  • master接收所有子结果,排序和返回
    因此基本上不需要工人之间的沟通。

  • 我的问题:
  • 如何在erlang进程数和内核数之间找到最佳平衡?因此,如果我为每个处理器核心生成一个进程,那将无法利用我的CPU?
  • pread / 3如何到达指定的行;它会遍历文件中的所有行吗? pread / 3是并行文件读取的好计划吗?
  • 从进程A向B发送一条大消息还是向N发送小消息更好?我已经在下面的链接中找到了答案的一部分,但我希望进一步阐述
    erlang message passing architecture
  • 最佳答案

  • Erlang进程很便宜。您可以自由使用(并鼓励使用)更多的内核。解决您的问题的实际方法可能会有一个上限(根据行的大小,每行在一个进程中加载​​1TB数据要花很多钱)。

    当您不知道时,最简单的方法是让用户决定。这意味着您可以决定生成N工人,并在他们之间分配工作,等待回音。如果您不喜欢它的运行方式,请在更改N的同时重新运行该程序。

    比较棘手的方法是对大量时间进行基准测试,选择您认为有意义的最大值,然后将其粘贴到池库中(如果您愿意;某些池用于预分配的资源,有些用于可调整的量),并寻求一种“一刀切”的解决方案。

    但是实际上,没有简单的“最佳内核数量”。您可以在50个进程上运行它,也可以在65,000个进程上运行它。如果任务令人尴尬地是并行的,则VM应该能够充分利用其中的大部分资源,并且无论如何都要饱和内核。

  • --
  • 并行文件读取是一个有趣的问题。它可能会更快,也可能不会更快(如直接评论所提到的),并且仅当每行的工作量最小而导致读取文件的成本最高时,它才可能表示速度提高。

    棘手的是,实际上像pread/2-3这样的功能需要字节偏移量。您的问题的措词使您担心文件的行数。因此,您移交给工作人员的字节偏移量可能最终会跨越一行。如果您的代码块以my中的this is my line\nhere it goes\n单词结尾,则一个工作人员将看到自己的行不完整,而另一个工作人员将仅报告my line\n,而缺少先前的this is

    通常,这种烦人的东西会导致您让第一个进程拥有文件并进行筛选,而只是将一些文本交予工作人员处理;然后,该过程将充当某种协调者。

    该策略的一个不错的方面是,如果主进程知道作为消息发送的所有内容,那么它也知道何时收到了所有响应,因此很容易知道何时返回结果。如果一切都不相交,则您必须信任初学者和工人,以告诉您“我们都失业了”,作为一组独特的独立消息要知道。

    在实践中,您可能会发现,最有用的是知道有关文件操作的操作,这些操作可以帮助您的硬件生存,而不是“有多少人可以一次读取文件”。只有一个硬盘(或SSD),无论如何所有数据都必须经过它。并行性最终可能会限制在那里的访问。

  • --
  • 使用对您的程序有意义的消息。性能最高的程序将有很多进程能够执行工作,而无需传递消息,进行通信或获取锁。

    一个更实际的,性能卓越的程序将使用很少的,很小的消息。

    有趣的是,您的问题本质上是基于数据的。因此,您可以执行以下操作:
  • 确保您以二进制格式阅读文本;将大型二进制文件(> 64b)分配到全局二进制堆上,并与引用计数
  • 共享并进行GC
  • 提交有关需要完成的信息,而不是执行该操作所需的数据;这需要进行测量,但是引导过程可以遍历文件,注意行尾,仅将字节偏移量移交给工作人员,以便他们可以自己读取文件。请注意,您最终将读取文件两次,因此,如果内存分配不是您的主要开销,那么
  • 可能会更慢
  • 确保以rawram模式读取文件。其他模式使用中间人进程来读取和转发数据(如果在群集的Erlang节点中通过网络读取文件,这将很有用); rawram模式将文件描述符直接提供给调用过程,并且速度更快。
  • 首先要担心编写清晰,可读和正确的程序。仅当它太慢时,才应尝试对其进行重构和优化。您可能会在第一次尝试中发现它已经足够好了。

  • 我希望这有帮助。

    附言您可以首先尝试真正简单的东西:
  • 之一:
  • 使用{ok, Bin} = file:read_file(Path)一次读取整个文件,并使用
  • 分割行(使用binary:split(Bin, <<"\n">>, [global]))
  • 使用{ok, Io} = file:open(File, [read,ram]),然后在文件描述符上反复使用file:read_line(Io)
  • 使用{ok, Io} = file:open(File, [read,raw,{read_ahead,BlockSize}]),然后在文件描述符上反复使用file:read_line(Io)
  • 调用rpc:pmap({?MODULE, Function}, ExtraArgs, Lines)以自动并行运行所有内容(它将每行生成一个进程)
  • 在结果上调用lists:sort/1

  • 然后,如果您认为有问题,则可以从那里优化每个步骤。

    关于concurrency - erlang流程和消息传递体系结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30754165/

    相关文章:

    concurrency - 并发编程和不确定性

    java - 如何理解 "add up how much of that resource each task requires and divide that into the total quantity available"决定线程池大小

    c - Erlang,是否可以在不重新启动 shell 的情况下重新加载或升级 nif 库?

    cocoa - NSTableView保证数据显示

    php - ignore_user_abort 并在 php 中重定向?

    database - Ejabberd 的名册管理

    erlang - 钢筋3 : dependency is not reachable

    java - Java中的并发访问稀疏矩阵

    java - 应该如何处理 javax.persistence.OptimisticLockException?

    在c中使用fork和pipe后,无法在终端中看到程序的输出