java - Google Pub/Sub Java 示例

标签 java google-cloud-pubsub

我找不到使用 java 从发布/订阅中读取消息的方法。

我在我的 pom 中使用这个 maven 依赖项

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.17.2-alpha</version>
</dependency>

我实现了这个主要方法来创建一个新主题:

public static void main(String... args) throws Exception {

        // Your Google Cloud Platform project ID
        String projectId = ServiceOptions.getDefaultProjectId();

        // Your topic ID
        String topicId = "my-new-topic-1";
        // Create a new topic
        TopicName topic = TopicName.create(projectId, topicId);
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            topicAdminClient.createTopic(topic); 
        }
}

上面的代码运行良好,事实上,我可以看到我使用谷歌云控制台创建的新主题。

我实现了以下主要方法来向我的主题写消息:

public static void main(String a[]) throws InterruptedException, ExecutionException{
        String projectId = ServiceOptions.getDefaultProjectId(); 
        String topicId = "my-new-topic-1";

        String payload = "Hellooooo!!!";
        PubsubMessage pubsubMessage =
                  PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

        TopicName topic = TopicName.create(projectId, topicId);

        Publisher publisher;
        try {
            publisher = Publisher.defaultBuilder(
                    topic)
                    .build();
            publisher.publish(pubsubMessage);

            System.out.println("Sent!");
        } catch (IOException e) {
            System.out.println("Not Sended!");
            e.printStackTrace();
        }
}

现在我无法验证此消息是否真的已发送。 我想使用对我的主题的订阅来实现消息阅读器。 有人可以向我展示一个关于从主题中读取消息的正确且有效的 Java 示例吗?

谁能帮帮我? 提前致谢!

最佳答案

这是使用谷歌云客户端库的版本。


package com.techm.data.client;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

/**
 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull
 * subscription and asynchronously pull messages from it.
 */
public class CreateSubscriptionAndConsumeMessages {

    private static String projectId = "projectId";
    private static String topicId = "topicName";
    private static String subscriptionId = "subscriptionName";

    public static void createSubscription() throws Exception {
        ProjectTopicName topic = ProjectTopicName.of(projectId, topicId);
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);

        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
        }
    }

    public static void main(String... args) throws Exception {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);       

        createSubscription();


        MessageReceiver receiver = new MessageReceiver() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                System.out.println("Received message: " + message.getData().toStringUtf8());
                consumer.ack();
            }
        };
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(subscription, receiver).build();
            subscriber.addListener(new Subscriber.Listener() {
                @Override
                public void failed(Subscriber.State from, Throwable failure) {
                    // Handle failure. This is called when the Subscriber encountered a fatal error
                    // and is
                    // shutting down.
                    System.err.println(failure);
                }
            }, MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();         

            // In this example, we will pull messages for one minute (60,000ms) then stop.
            // In a real application, this sleep-then-stop is not necessary.
            // Simply call stopAsync().awaitTerminated() when the server is shutting down,
            // etc.
            Thread.sleep(60000);
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }
    }
}

这对我来说很好。

关于java - Google Pub/Sub Java 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44485473/

相关文章:

angular - 使用 firebase cloud pub/sub 在聊天应用程序中实现 "Someone is typing..."功能

java - 如何知道java JRadiobutton被选中

java - 使用 Docker 时的 Spring Boot 外部配置

google-bigquery - Apache Beam/Google Dataflow PubSub 到 BigQuery 管道 : Handling Insert Errors and Unexpected Retry Behavior

unit-testing - 如何对pubsub进行单元测试接收回调

google-bigquery - 使用主题模式的 PubSub 和 BigQuery 订阅

google-cloud-platform - GCP 数据流与云函数

Java 毫秒精度

java - == 和 equals() 之间的区别

java - 为什么在Gradle war插件中 'javax.servlet'可以是 'ProvidedCompile'?