google-cloud-platform - 为什么使用 Dataflow 写入 Bigquery 非常慢?

标签 google-cloud-platform google-bigquery google-cloud-dataflow apache-beam

我可以以每秒大约 10,000 次插入的速度将插入直接流式传输到 BigQuery 中,但是当我尝试使用 Dataflow 插入时,“ToBqRow”步骤(如下所示)非常慢。 每 10 分钟几乎不超过 50 行,而且是在 4 个工作人员的情况下进行的。知道为什么吗?相关代码如下:

PCollection<Status> statuses = p
        .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
        .apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
    @ProcessElement
    public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
            String rowJson = c.element();

            try {
                TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                Status status = TwitterObjectFactory.createStatus(rowJson);
                if (status == null) {
                    TweetsWriter.LOGGER.error("Status is null");
                } else {
                    TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                }
                c.output(status);
                TweetsWriter.LOGGER.debug("Status: " + status.getId());
            } catch (Exception var4) {
                TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
            }

    }
}));

statuses
        .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = new TableRow();
                Status status = c.element();
                row.set("Id", status.getId());
                row.set("Text", status.getText());
                row.set("RetweetCount", status.getRetweetCount());
                row.set("FavoriteCount", status.getFavoriteCount());
                row.set("Language", status.getLang());
                row.set("ReceivedAt", (Object)null);
                row.set("UserId", status.getUser().getId());
                row.set("CountryCode", status.getPlace().getCountryCode());
                row.set("Country", status.getPlace().getCountry());
                c.output(row);
        }
    }))
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
                .withSchema(schema)
                .withMethod(Method.STREAMING_INSERTS)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

p.run();

最佳答案

结果数据流下的Bigquery并不慢。问题是,'status.getPlace().getCountryCode()'返回NULL,因此它抛出NullPointerException,我在任何地方都看不到日志!显然,数据流日志记录需要改进。现在运行得非常好。一旦消息进入主题,几乎立即就会被写入 BigQuery!

关于google-cloud-platform - 为什么使用 Dataflow 写入 Bigquery 非常慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51357011/

相关文章:

google-cloud-platform - 有没有办法在 Cloud BigTable 中转储 bool 对象?

java - Dataflow 如何与 BIgQuery 数据集配合使用

c#-4.0 - Google Bigquery 中的季度、周日期函数

google-bigquery - 运行 STRING_AGG 函数时发现错误

python - 添加requirements.txt [Python]时数据流失败

python - App Engine Python,标准环境,使用 time.sleep

google-cloud-platform - 谷歌云权限

maven - 从 maven 构建的 jar 运行 Apache Beam/Google Cloud Dataflow 作业

google-cloud-platform - Airflow 安装故障梁[gcp]

python - 如何在一个 flask 项目中设置两个 os.environ