我正在尝试将一些文档保存到Elasticsearch:
newStream.foreach(rdd => rdd.saveToEs(elasticResource))
使用SparkConf的设置:
val conf = new SparkConf().setMaster("local[*]")
.setAppName("RabbitIngestor")
.set("es.nodes", "localhost")
.set("es.port", "9200")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.batch.size.entries", "0")
val elasticResource = "data/product"
和上下文:
val ssc = new StreamingContext(conf, Seconds(1))
但是工作无济于事,只是增加了工作却没有开始。
15/08/13 17:43:53 INFO JobScheduler: Added jobs for time 1439484233000 ms
15/08/13 17:43:54 INFO JobScheduler: Added jobs for time 1439484234000 ms
15/08/13 17:43:55 INFO JobScheduler: Added jobs for time 1439484235000 ms
我认为这与批处理大小有关(但是我将其设置为1)。另外,如果不存在索引,则创建索引。
有任何想法吗?
更新:最终我收到此错误,并写入了一个文档:
15/08/13 18:26:03 ERROR NetworkClient: Node [Operation timed out] failed (172.17.0.28:9200); selected next node [localhost:9200]
不过,不知道从哪里获得该地址,而不是正确的地址(本地主机)
最佳答案
最后的问题是我在docker内部运行了Elasticsearch,在通过localhost进行初始握手后,它试图直接与节点通信。因此,这是Docker的端口转发问题。
关于elasticsearch - saveToEs( Elasticsearch Spark )添加作业但未开始,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31994429/