hadoop - 在本地机器上运行 Dataproc BigQuery 示例

标签 hadoop google-cloud-dataproc

我正在尝试在本地计算机上运行连接器示例,但不断出现 UnknownHostException。如何使用 Hadoop 连接器配置对 BigQuery 的访问?

package com.mycompany.dataproc;

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;


public class BigQueryAccessExample {
    JavaSparkContext jsc ;
    public BigQueryAccessExample(JavaSparkContext jsc){

    }
    public static void main(String[] args) throws Exception{

        SparkConf conf = new SparkConf()
                .setAppName("BigQuery Reader").setMaster("local[5]");

        conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
        JavaSparkContext jsc = new JavaSparkContext(conf);

        String projectId = "mycompany-data";
        String fullyQualifiedInputTableId = "mylogs.display20151030";

        Configuration hadoopConfiguration = jsc.hadoopConfiguration();
        //BigQueryConfiguration.
        // Set the job-level projectId.
        hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

        // Use the systemBucket for temporary BigQuery export data used by the InputFormat.
        String bucket = "my-spark-test";

        hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket);

        com.google.cloud.hadoop.io.bigquery.

        // Configure input and output for BigQuery access.
        BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, fullyQualifiedInputTableId);

        //BigQueryConfiguration.configureBigQueryOutput(conf, fullyQualifiedOutputTableId, outputTableSchema);
        JavaPairRDD<LongWritable, JsonObject> tableData = jsc.newAPIHadoopRDD(hadoopConfiguration, GsonBigQueryInputFormat.class, LongWritable.class,  JsonObject.class);

        //tableData.count();
        JavaRDD<JsonObject> myRdd = tableData.map(new Function<Tuple2<LongWritable, JsonObject>, JsonObject>() {
            public JsonObject call(Tuple2<LongWritable, JsonObject> v1) throws Exception {
                System.out.println(String.format("idx: %s val: %s", v1._1(), v1._2().toString()));
                return v1._2();
            }
        });
        myRdd.take(10);

    }
}

但我得到 UnknownHostException

java.net.UnknownHostException: metadata
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
    at sun.net.www.http.HttpClient.New(HttpClient.java:308)
    at sun.net.www.http.HttpClient.New(HttpClient.java:326)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
    at com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:142)
    at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489)
    at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:189)
    at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:71)
    at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.createBigQueryCredential(BigQueryFactory.java:81)
    at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQuery(BigQueryFactory.java:101)
    at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQueryHelper(BigQueryFactory.java:89)
    at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.getBigQueryHelper(AbstractBigQueryInputFormat.java:363)
    at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.getSplits(AbstractBigQueryInputFormat.java:102)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:115)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1277)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
    at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:494)
    at org.apache.spark.api.java.AbstractJavaRDDLike.take(JavaRDDLike.scala:47)
    at com.mycompany.dataproc.BigQueryAccessExample.main(BigQueryAccessExample.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

我似乎需要设置访问凭据或权限.. 但我没有看到任何相关文档。

我从 https://console.developers.google.com/project/ 下载了凭证/apiui/凭据 并设置 GOOGLE_APPLICATION_CREDENTIALS 但这似乎不起作用。

有什么帮助吗?

最佳答案

最简单的方法是创建一个新的服务帐户并下载 .p12 文件(Hadoop 连接器当前不支持应用程序默认凭证或 JSON key 文件):

String serviceAccount = "foo@bar.gserviceaccount.com";
String localKeyfile = "/path/to/local/keyfile.p12";

hadoopConfiguration.set("google.cloud.auth.service.account.enable", true);
hadoopConfiguration.set("google.cloud.auth.service.account.email", serviceAccount);
hadoopConfiguration.set("google.cloud.auth.service.account.keyfile", localKeyfile);

关于hadoop - 在本地机器上运行 Dataproc BigQuery 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33509865/

相关文章:

hadoop - 如何在dataproc中动态升级worker的cpu/ram/disk?

google-cloud-platform - 由于网络无法访问 googleapis.com,Dataproc 配置超时

java - mapreduce错误:java.lang.indexoutofboundsexception:index:2,size:2

mysql - 在 Mac 操作系统上安装 Hive-2.1.1 出现错误 ‘Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient’

scala - 使用 HBase 的 Spark 作业失败

python - python子进程检查输出:如何获取整个消息

google-cloud-dataproc - Dataproc 上的 Jupyterlab -- 403 错误 - 无法读取未定义的属性 'path'

apache-spark - 没有作业正在运行时,Dataproc集群是否可以自动缩减为0个工作程序?

apache-spark - Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时引发错误

java - Apache Mahout中的WrongValueClass