使用 Spring 框架
和 Google Cloud Platform
开发实时跟踪系统。
Spring Cloud GCP
可以轻松编写 GCP PubSub
应用程序 Spring Integration
方式。从他们的 github 页面我能够编写如下应用程序:Github Samples
@Configuration
@Slf4j
public class GCPConfiguration {
/*
* Message sender code
* */
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
PubSubMessageHandler adapter =
new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
adapter.setPublishCallback(new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {
log.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
log.info("Message was sent successfully.");
}
});
return adapter;
}
@MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
public interface PubSubOutboundGateway {
void sendToPubSub(String text);
}
/*
* Message receiver code
* */
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubOperations pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
log.info("Message arrived! Payload: " + payload);
ackReplyConsumer.ack();
}
}
跟踪设备将持续向此应用程序公开的 TCP
端口发送数据,该端口需要进行转换,然后持久保存到 BigQuery
和 GC SQL
。从 TCP
端口获取数据并将其发布到 GC PubSub
已经到位。
我不知道如何以及在何处添加来自 GC PubSub
Google Cloud Dataflow
代码
更新
目标是将数据插入到 GC BigQuery
和 GC SQL
,所以回答将导致数据插入到这些服务中是可以的。
最佳答案
您的问题似乎是如何使用 Google Cloud Dataflow 从 Pub/Sub 流式传输到 Bigquery。
Dataflow 站点链接到此 example从它的例子page .请注意,这是使用 1.x 版本的 SDK 而不是 2.x Apache Beam 版本。你可以找到一个类似的例子here .
还有一个谷歌提供的template可以使用并且不需要编码。
编辑:该模板最近是开源的,可在 github 上找到.
关于java - 使用 Spring Integration 的带有 Dataflow 的 Google PubSub,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48461236/