我在 BigQuery 中有一个表,我想要查询并实现 FPgrowth 算法。 我想首先使用 dataproc 集群的 VM 实例在 pyspark shell 上尝试一下。
我正在寻找一种使用 pyspark 直接查询 BQ 中的表的方法。我想使用查询到的数据来实现FPGrowth(我已经熟悉了)。
最佳答案
Dataproc 已经拥有可用于通过 BigQuery 进行查询所需的连接器,如 docs 中所示。 .
文档中的代码示例:
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
我还建议使用 Jupyter service 创建一个 Dataproc 集群。安装。这将使您可以动态测试如何实现 FPgrowth 或您最终想要尝试的任何其他想法。
事实上,在写这个答案之前,我只是使用我当前的 jupyter 笔记本来查询 BQ,看看它是如何工作的:
关于pyspark - Dataproc 中的 BigQuery 和 Pyspark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47192243/