java - 使用 Spring Integration 的带有 Dataflow 的 Google PubSub

标签 java spring-integration spring-cloud google-cloud-dataflow google-cloud-pubsub

使用 Spring 框架Google Cloud Platform 开发实时跟踪系统。 Spring Cloud GCP 可以轻松编写 GCP PubSub 应用程序 Spring Integration 方式。从他们的 github 页面我能够编写如下应用程序:Github Samples

@Configuration
@Slf4j
public class GCPConfiguration {
    /*
    *   Message sender code
    * */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
        PubSubMessageHandler adapter =
                new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        adapter.setPublishCallback(new ListenableFutureCallback<String>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                log.info("Message was sent successfully.");
            }
        });

        return adapter;
    }

    @MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
    public interface PubSubOutboundGateway {
        void sendToPubSub(String text);
    }

    /*
    *   Message receiver code
    * */
    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubOperations pubSubTemplate) {
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
        log.info("Message arrived! Payload: " + payload);

        ackReplyConsumer.ack();
    }
}

跟踪设备将持续向此应用程序公开的 TCP 端口发送数据,该端口需要进行转换,然后持久保存到 BigQueryGC SQL。从 TCP 端口获取数据并将其发布到 GC PubSub 已经到位。 我不知道如何以及在何处添加来自 GC PubSub

Google Cloud Dataflow 代码

更新

目标是将数据插入到 GC BigQueryGC SQL,所以回答将导致数据插入到这些服务中是可以的。

最佳答案

您的问题似乎是如何使用 Google Cloud Dataflow 从 Pub/Sub 流式传输到 Bigquery。

Dataflow 站点链接到此 example从它的例子page .请注意,这是使用 1.x 版本的 SDK 而不是 2.x Apache Beam 版本。你可以找到一个类似的例子here .

还有一个谷歌提供的template可以使用并且不需要编码。

编辑:该模板最近是开源的,可在 github 上找到.

关于java - 使用 Spring Integration 的带有 Dataflow 的 Google PubSub,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48461236/

相关文章:

java - 我的 java 代码中调用 pop 失败

java - JDBC 库在 Android Studio 中不工作

java - Spring 集成 JMS 在入站和出站中发布订阅

java - 如何使用 Spring Cloud Contracts 将 Stub 中的 NULL 值设置为 clientValue

java - 我们是否需要在Spring云合约中 stub 其他微服务

java - 是否有可能使函数像 for 循环一样工作?

java - 为什么 char[] 优于 String 作为密码?

java - 尚未为 channel 适配器定义轮询器

java - Spring Integration 中的 Http Outbound Gateway 错误处理

spring - 无法使用Spring云数据流将流数据写入接收器文件