mysql - 从 Google Dataflow 连接到 MySQL

标签 mysql google-cloud-dataflow

我正在尝试从 Google Dataflow 连接到 AWS RDS MySQL 实例。我创建了一个java程序来创建管道。作业创建成功,但 MySQL 连接总是失败,并出现以下错误:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:338)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:308)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:154)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:308)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:264)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:133)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:113)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:100)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeSetup(Unknown Source)
    at com.google.cloud.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:63)
    at com.google.cloud.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:45)
    at com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:94)
    at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:415)
    at com.google.cloud.dataflow.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:326)
    ... 14 more
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException:
Communications link failure
Caused by: java.net.SocketTimeoutException: connect timed out

JAVA源代码如下:

public class MySQLToBQ {
    public static void main(String[] args) throws Exception {
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("project_name");
        options.setStagingLocation("gs://staging");
        options.setTempLocation("gs://temp");
        options.setRunner(DataflowRunner.class);
        options.setJobName("MySQL-To-BQ-" + new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()));
        System.out.println("Job Name " + options.getJobName());
        Pipeline p = Pipeline.create(options);

        DataSourceConfiguration mysqlConfig = JdbcIO.DataSourceConfiguration.create(
                "com.mysql.cj.jdbc.Driver", "jdbc:mysql://mysql_host:3306/mysql_database")
                .withUsername("user")
                .withPassword("password");

        p.apply("mysql_source", JdbcIO.<SourceRow>read()
            .withDataSourceConfiguration(mysqlConfig)
            .withQuery("query")
            .withCoder(SerializableCoder.of(SourceRow.class))
            .withRowMapper(new JdbcIO.RowMapper<SourceRow>() {
                    @Override
                    public SourceRow mapRow(ResultSet resultSet) throws Exception {
                        SourceRow datarow = new SourceRow();
                        ResultSetMetaData rsmd = resultSet.getMetaData();
                        for(int i = 1; i <= rsmd.getColumnCount(); i++) {
                            datarow.add(rsmd.getColumnName(i), resultSet.getString(i));
                        }
                    return datarow;
                    }
                }
            )
        )
        .apply(table + "_transform", ParDo.of(new TransformToTableRow()))
        .apply(table + "_destination", BigQueryIO.writeTableRows()
            .to("table_name")
            .withSchema(getSchema())
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        );

        p.run();
    }
}

我能够创建一个 Compute Engine 虚拟机实例并从那里成功连接到 MySQL 数据库。

最佳答案

在 Dataflow 上,您无法将 IP 列入白名单以使 Dataflow 能够访问 SQL 实例。我不确定 AWS RDS 但对于 Cloud SQL,因此您应该使用 JDBC 套接字工厂 https://cloud.google.com/sql/docs/mysql/connect-external-app#java

关于mysql - 从 Google Dataflow 连接到 MySQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50180241/

相关文章:

mysql - 用MAX函数连接三张表,表结构有点复杂

mysqldump 行超时问题

google-cloud-platform - 带有python flex模板的数据流 - 启动器超时

java - 在 Apache Beam 中读取 CSV 文件时跳过 header

java - 是否可以使用 Java 8 运行我的数据流管道代码?

php - 如何使用 PHP 在 MySQL 中插入空间数据?

PHP 循环从记录号开始

MySQL 添加约束 18+

java - 如何以编程方式从工作人员处终止 Beam Dataflow 作业

google-cloud-dataflow - 带有触发器用例的数据流滑动窗口与全局窗口?