elasticsearch - 如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

标签 elasticsearch dataframe pyspark

Elasticsaerch 的文档仅涵盖将完整索引加载到 Spark。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

如何执行查询以从 Elasticsearch 索引返回数据并使用 pyspark 将它们作为 DataFrame 加载到 Spark?

最佳答案

下面是我的做法。

一般环境设置和命令:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

您还可以定义数据框列。引用Here了解更多信息。

希望对您有所帮助!

关于elasticsearch - 如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38162901/

相关文章:

python - Apache Spark 读取 CSV 文件 - ClassNotFoundException

elasticsearch - Grafana与Elasticsearch的联系

linux - 类似系统的关键字,例如 failure、true、closed、 "unable to"在作为查询值处理时突出显示

python - 将 pandas 数据框转换为嵌套字典

python - 使用 Pandas 使用特定列的权重对 DataFrame 进行采样

linux - 如何从 spark-submit 获取返回码?

elasticsearch - MariaDB和Elasticsearch

mongodb - elasticsearch与mongodb river一起引发异常工作

python - Pandas 2018 世界杯数据集净胜球

apache-spark - PySpark:反序列化包含在 eventhub 捕获 avro 文件中的 Avro 序列化消息