R 连接到 EC2 实例以进行并行处理

标签 r foreach parallel-processing amazon-ec2 snowfall

我在初始化从 R 到 AWS EC2 实例的连接时遇到问题,因为我似乎不断收到错误:Permission denied (publickey)我目前使用 Mac OS X 10.6.8 作为我的操作系统

我尝试在终端 ($) 和 R (>) 中运行的代码如下:

$ R --vanilla
> require(snowfall)
> sfInit(parallel=TRUE,socketHosts =list("ec2-xx-xxx-xx-xx.zone.compute.amazonaws.com"))
Permission denied (publickey)

但奇怪的是,当尝试通过 ssh 进入实例时,我不需要密码,因为我已经在初始化时将公钥导入到实例中,(我认为)

所以从我的普通终端......运行时
$ ssh ubuntu@ec2-xx-xxx-xx-xx.zone.compute.amazonaws.com

它会自动连接...(所以我不能 100% 确定它是否是像 Using snow (and snowfall) with AWS for parallel processing in R 那样的无密码问题)

我曾尝试查看大量有关 key 等的 Material ,但似乎没有任何区别。还有我的 ~/.ssh/authorized_keys由于某种原因是文件夹而不是文件,即使尝试 sudo cd .ssh/authorized_keys 我也无法访问它...在权限方面它有drw-------
最终目标是连接到大量 ec2 实例并使用 foreach进行一些并行处理......但现在连接到一个会很好......我也想使用我自己的ami所以星团并不是我真正想要的......(除非我是能够使用私有(private) amis 并私下运行所有​​命令......)

此外,如果 doRedis 比有人可以告诉我如何从本地机器连接到 ec2 实例更好,那也很好......

编辑

我设法使用 parallel 处理了 ssh 无密码登录。包裹makePSOCKclusterR and makePSOCKcluter EC2 socketConnection所示...但现在遇到socketConnection问题如链接中的问题所示...

任何想法如何连接到它?

也证明一切正常,我想这意味着以下命令/函数可以用于获取所有不同的 IP 地址
d <- parLapply(cl1, 1:length(cl1),function(x)system("ifconfig",intern=T)[2])

哪里cl1make*cluster 的输出功能

注意 因为赏金实际上是针对链接中的问题......我不介意你发布答案的问题......但是只要在这个问题上写了一些东西将它链接到正确的答案链接的问题,然后我会相应地奖励积分......

最佳答案

在尝试将主节点保持在本地时,我在并行 EC2 设置方面也遇到了很多问题。使用 StarCluster设置池有很大帮助,但真正的改进来自使用 StarCluster 并在 EC2 私有(private) ip 池中拥有主节点。

StarCluster 为所有节点以及使用的任何挂载设置所有 key 处理。动态节点分配是不可行的,但除非要长期使用现货实例并且您的出价策略不会“保留”您的实例,否则动态分配应该是一个问题。

其他一些经验教训:

  • 创建一个包含私有(private) IP 的变量以传递给 createCluster 并将其导出,因此当您需要使用相同的节点重新启动时,它会更容易。
  • 让主节点运行 byobu 并将其设置为 R session 日志记录。
  • 在主节点上运行 RStudio 服务器有时会非常有帮助,但应该是与从节点不同的 AMI。 :)
  • 让控制脚本将数据 rda 文件卸载到远程监控新文件的路径并自动下载它们。
  • 使用 htop 监控从站,以便您可以轻松查看实例并确定脚本要求(内存/cpu/可扩展性)。
  • 利用处理器超线程启用/禁用脚本。

  • 我在从属连接和序列化/反序列化方面遇到了很多问题,发现其中一件事是连接限制,连接限制需要减少节点数;当控制脚本停止时,最简单的清理方法是重新启动主 R session ,并使用脚本杀死从进程而不是等待超时。

    设置确实需要一些工作,但希望这些想法有助于...

    尽管 8 个月前 StarCluster 和 R 都发生了变化,但这里有一些它的设置方式……您可以在 StarCluster 文档中找到其中的 90%。
  • 根据来自 AWS 控制台的安全信息设置 .starcluster/config AWS 和 key 对部分。
  • 定义 [smallcluster]
  • 键名
  • 可用区
  • 定义扩展 [smallcluster] 的集群模板。使用基于 StarCluster 64 位 HVM AMI 的 AMI。我没有创建新的公共(public) AMI 实例,而是保存了一个已配置的实例(带有我需要的所有工具)并将其用作 AMI。

  • 这是一个例子...
    [cluster Rnodes2]
    EXTENDS=smallcluster
    MASTER_INSTANCE_TYPE = cc1.4xlarge
    MASTER_IMAGE_ID= ami-7621f91f
    NODE_INSTANCE_TYPE = cc2.8xlarge
    NODE_IMAGE_ID= ami-7621f91f
    CLUSTER_SIZE= 8
    VOLUMES= rdata
    PLUGINS= pkginstaller
    SPOT_BID= 1.00
    
  • 设置共享卷,这是屏幕/byoubu 日志、主要 .R 脚本检查点输出、共享 R 数据和生产包的源。它在名为 export 的子路径中监视新文件,因此如果集群或控制脚本死亡/异常终止,最大数量的记录将全部丢失并需要重新计算。

  • 创建共享卷后,定义很简单:
    [volume rdata]
    VOLUME_ID = vol-1145497c
    MOUNT_PATH = /rdata
    

    确保所有节点上最新(和相同)R 版本的包安装程序。
    [plugin pkginstaller]
    setup_class = starcluster.plugins.pkginstaller.PackageInstaller
    packages = r-base, r-base-dev, r-recommended
    

    最后,ssh 和 RStudio 服务器的访问权限。通过代理的 Https 会更安全,但由于 RStudio 仅用于控制脚本设置......
    [permission ssh]
    # protocol can be: tcp, udp, or icmp
    protocol = tcp
    from_port = 22
    to_port = 22
    
    # [permission http]
    protocol = tcp
    from_port = 8787
    to_port = 8787
    

    然后使用 StarCluster 界面启动一个集群。它处理所有访问控制、系统名称、共享等......集群运行后,我从本地系统运行一个 ssh session 到每个 session ,并运行一个脚本来停止超线程:
    #!/bin/sh
    
    # disable hyperthreading
    for cpunum in $(
        cat /sys/devices/system/cpu/cpu*/topology/thread_siblings_list | 
        cut -s -d, -f2- | tr ',' '\n' | sort -un); do
            echo 0 > /sys/devices/system/cpu/cpu$cpunum/online
    done
    

    然后在每个 session 上启动一个 htop session ,以根据导出的检查点日志监控可伸缩性。

    然后,登录到主服务器,启动一个屏幕 session (我从那时起就首选 byobu)并从 StarCluster 安装的卷中启动 R。这样当集群由于某种原因停止时,我可以通过启动 R 轻松地重新设置。一旦进入 R,第一件事就是创建一个 workers.list变量使用 nodeXXX名称,这只是以下内容:
    cluster.nodes <- c("localhost", paste("node00", 1:7, sep='' ) )
    workers.list <- rep( cluster.nodes, 8 )
    

    然后我加载了控制脚本,退出并保存了工作区。控制脚本处理用于导出和检查点的所有表输出以及对生产包的 par 包装调用。脚本的主要功能也带了一个cpus参数是放置 worker 列表的位置,然后作为 cores 传递。到集群初始值设定项。
    initialize.cluster <- function( cores )
    {
      if( exists( 'cl' ) ) stopCluster( cl )
    
      print("Creating Cluster")
      cl <- makePSOCKcluster( cores )    
      print("Cluster created.")
      assign( 'cl', cl, envir=.GlobalEnv )
      print( cl )
    
      # All workers need to have the bounds generator functions...
      clusterEvalQ( cl, require('scoreTarget') )
      # All workers need to have the production script and package.
      clusterExport( cl, varlist=list('RScoreTarget', 'scoreTarget'))
      return ( cl )
    }
    

    一旦 R session 重新启动(在最初创建 worker.list 之后),控制脚本就被提供了,并且主函数被调用。就是这样。有了这个设置,如果集群停止了,我就会退出主主机上的 rsession;通过 htop 在每个从站上停止从站进程并再次启动。

    这是它的一个例子:
    R
    
    R version 2.15.0 (2012-03-30)
    Copyright (C) 2012 The R Foundation for Statistical Computing
    ISBN 3-900051-07-0
    Platform: x86_64-pc-linux-gnu (64-bit)
    
    R is free software and comes with ABSOLUTELY NO WARRANTY.
    You are welcome to redistribute it under certain conditions.
    Type 'license()' or 'licence()' for distribution details.
    
      Natural language support but running in an English locale
    
    R is a collaborative project with many contributors.
    Type 'contributors()' for more information and
    'citation()' on how to cite R or R packages in publications.
    
    Type 'demo()' for some demos, 'help()' for on-line help, or
    'help.start()' for an HTML browser interface to help.
    Type 'q()' to quit R.
    
    [Previously saved workspace restored]
    
    > source('/rdata/buildSatisfactionRangeTable.R')
    Loading required package: data.table
    data.table 1.7.7  For help type: help("data.table")
    Loading required package: parallel
    Loading required package: scoreTarget
    Loading required package: Rcpp
    > ls()
     [1] "build.satisfaction.range.table" "initialize.cluster"            
     [3] "initialize.table"               "parallel.choices.threshold"    
     [5] "rolled.lower"                   "rolled.upper"                  
     [7] "RScoreTarget"                   "satisfaction.range.table"      
     [9] "satisfaction.search.targets"    "search.range.bound.offsets"    
    [11] "search.range.bounds"            "search.range.center"           
    [13] "Search.Satisfaction.Range"      "update.bound.offset"           
    [15] "workers.list"                  
    > workers.list
      [1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
      [7] "localhost" "localhost" "node001"   "node002"   "node003"   "node004"  
     [13] "node005"   "node006"   "node007"   "node001"   "node002"   "node003"  
     [19] "node004"   "node005"   "node006"   "node007"   "node001"   "node002"  
     [25] "node003"   "node004"   "node005"   "node006"   "node007"   "node001"  
     [31] "node002"   "node003"   "node004"   "node005"   "node006"   "node007"  
     [37] "node001"   "node002"   "node003"   "node004"   "node005"   "node006"  
     [43] "node007"   "node001"   "node002"   "node003"   "node004"   "node005"  
     [49] "node006"   "node007"   "node001"   "node002"   "node003"   "node004"  
     [55] "node005"   "node006"   "node007"   "node001"   "node002"   "node003"  
     [61] "node004"   "node005"   "node006"   "node007"   "node001"   "node002"  
     [67] "node003"   "node004"   "node005"   "node006"   "node007"   "node001"  
     [73] "node002"   "node003"   "node004"   "node005"   "node006"   "node007"  
     [79] "node001"   "node002"   "node003"   "node004"   "node005"   "node006"  
     [85] "node007"   "node001"   "node002"   "node003"   "node004"   "node005"  
     [91] "node006"   "node007"   "node001"   "node002"   "node003"   "node004"  
     [97] "node005"   "node006"   "node007"   "node001"   "node002"   "node003"  
    [103] "node004"   "node005"   "node006"   "node007"   "node001"   "node002"  
    [109] "node003"   "node004"   "node005"   "node006"   "node007"   "node001"  
    [115] "node002"   "node003"   "node004"   "node005"   "node006"   "node007"  
    > build.satisfaction.range.table(500000, FALSE, workers.list )
    [1] "Creating Cluster"
    [1] "Cluster created."
    socket cluster with 120 nodes on hosts ‘localhost’, ‘node001’, ‘node002’, ‘node003’, ‘node004’, ‘node005’, ‘node006’, ‘node007’
    Parallel threshold set to: 11000 
    Starting at: 2 running to: 5e+05 :: Sat Apr 14 22:21:05 2012 
    

    如果您已经阅读到这里,那么您可能有兴趣知道我测试了我可以测试的每个集群设置(包括 openMPI)并发现没有速度差异,也许那是因为我的计算如此受 CPU 限制,也许不是.

    此外,即使开始使用 HPC 可能会很痛苦,也不要放弃。这完全值得。如果我在商用工作站上坚持使用 base-R 中的幼稚实现,我仍然会等待完成我正在运行的计算的前 100,000 次迭代(嗯,不是真的,因为我永远不会坚持使用 R :D )。使用集群,在一周内完成了 384,000 次迭代。完全值得花时间(并且花了很多时间)进行设置。

    关于R 连接到 EC2 实例以进行并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13520932/

    相关文章:

    r - ggplot facet_grid 中分面的单独调色板

    r - 使用 R 更新 map() 循环外的向量

    c# - 将每一行保存在手动构建的 gridview 中

    r - 启动 Amazon EC2 集群以用作 foreach 后端的最简单方法

    scala - 应用仿函数如何与并行算法联系起来? (斯卡拉和斯卡拉兹)

    r - 在 CSV 文件中附加一个向量作为一行

    linux - 无法获得语法突出显示以在 vim 中使用 R 代码

    linq - LINQ 与 foreach 迭代器 block 的 C# 性能

    java - OpenCL中如何知道内存映射成功

    visual-c++ - 为什么 Visual Studio 2012 Express 的自动并行化报告功能仅适用于 Win32?