pyspark - Dataproc 中的 BigQuery 和 Pyspark

标签 pyspark google-bigquery google-cloud-dataproc

我在 BigQuery 中有一个表,我想要查询并实现 FPgrowth 算法。 我想首先使用 dataproc 集群的 VM 实例在 pyspark shell 上尝试一下。

我正在寻找一种使用 pyspark 直接查询 BQ 中的表的方法。我想使用查询到的数据来实现FPGrowth(我已经熟悉了)。

最佳答案

Dataproc 已经拥有可用于通过 BigQuery 进行查询所需的连接器,如 docs 中所示。 .

文档中的代码示例:

import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

我还建议使用 Jupyter service 创建一个 Dataproc 集群。安装。这将使您可以动态测试如何实现 FPgrowth 或您最终想要尝试的任何其他想法。

事实上,在写这个答案之前,我只是使用我当前的 jupyter 笔记本来查询 BQ,看看它是如何工作的:

enter image description here

关于pyspark - Dataproc 中的 BigQuery 和 Pyspark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47192243/

相关文章:

machine-learning - LogesticRegression fit() 函数抛出此错误

python-2.7 - GAE - 无法从 BigQuery API 获取 URL

apache-spark - 如何在 Dataproc 上调试 Spark 作业?

opencv - 谷歌云 Dataproc 安装 libopencv

python-3.x - 如何在aws Glue中将json写回s3?

apache-spark - pyspark function.lag 条件

google-cloud-platform - BigQuery 数据集已完全删除/消失

google-bigquery - Bigquery 免费试用版限制

apache-spark - 监控 Dataproc 集群上的 Spark-Shell 或 PySpark-Shell session

python - 如何在 pyspark 操作中轻松使用自定义类方法?