java - Kafka Streams 应用程序在 docker 容器中的奇怪行为

标签 java docker apache-kafka apache-kafka-streams

我正在使用 docker-compose 在 docker 容器中运行 Kafka Streams 应用程序。但是,流应用程序表现异常。所以,我有一个源主题 (topicSource) 和多个目标主题 (topicDestination1 , topicDestination2 ... topicDestination10 ) 我根据某些谓词分支到。

topicSouretopicDestination1 有直接映射,即所有记录都直接进入目标主题,没有任何过滤。

现在,当我在本地或没有容器的服务器上运行应用程序时,所有这一切都工作得很好。

另一方面,当我在容器中运行流应用程序(使用 docker-compose 和使用 kubernetes)时,它不会将所有日志从 topicSoure 转发到 topicDestination1 .事实上,只有少数记录被转发。例如,源主题有 3000 多条记录,而目标主题只有 6 条记录。而这一切真的很奇怪。

这是我的 Dockerfile:

#FROM openjdk:8u151-jdk-alpine3.7
FROM openjdk:8-jdk

COPY /target/streams-examples-0.1.jar /streamsApp/

COPY /target/libs /streamsApp/libs

COPY log4j.properties /

CMD ["java", "-jar", "/streamsApp/streams-examples-0.1.jar"]

注意:我在创建图像之前构建了一个 jar,以便我始终拥有更新的代码。我已经确保这两种代码,一个在没有容器的情况下运行的代码和一个在容器中运行的代码是相同的。

主.java:

从源主题创建源流:

KStream<String, String> source_stream = builder.stream("topicSource");

基于谓词的分支:

KStream<String, String>[] branches_source_topic = source_stream.branch(
                (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),        // Sharing Set by Date
                (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")), // Added to secure link
                (key, value) -> (value.contains("Operation\":\"AddedToGroup")),                                             // Added to group
                (key, value) -> (value.contains("Operation\":\"Add member to role.") || value.contains("Operation\":\"Remove member from role.")),//Role update by date
                (key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileDeleted")
                        || value.contains("Operation\":\"FileRenamed") || value.contains("Operation\":\"FileMoved")),       // Upload file by date
                (key, value) -> (value.contains("Operation\":\"UserLoggedIn")),                                             // User logged in by date
                (key, value) -> (value.contains("Operation\":\"Delete user.") || value.contains("Operation\":\"Add user.")
                        && value.contains("ResultStatus\":\"success")),                                                     // Manage user by date
                (key, value) -> (value.contains("Operation\":\"DLPRuleMatch") && value.contains("Workload\":\"OneDrive"))   // MS DLP
                );

将日志发送到目标主题:

这是直接映射主题,即所有记录都直接进入目标主题,没有任何过滤。

AppUtil.pushToTopic(source_stream, Constant.USER_ACTIVITY_BY_DATE, "topicDestination1");

将日志从分支发送到目标主题:

AppUtil.pushToTopic(branches_source_topic[0], Constant.SHARING_SET_BY_DATE, "topicDestination2");
AppUtil.pushToTopic(branches_source_topic[1], Constant.ADDED_TO_SECURE_LINK_BY_DATE, "topicDestination3");
AppUtil.pushToTopic(branches_source_topic[2], Constant.ADDED_TO_GROUP_BY_DATE, "topicDestination4");
AppUtil.pushToTopic(branches_source_topic[3], Constant.ROLE_UPDATE_BY_DATE, "topicDestination5");
AppUtil.pushToTopic(branches_source_topic[4], Constant.UPLOAD_FILE_BY_DATE, "topicDestination6");
AppUtil.pushToTopic(branches_source_topic[5], Constant.USER_LOGGED_IN_BY_DATE, "topicDestination7");
AppUtil.pushToTopic(branches_source_topic[6], Constant.MANAGE_USER_BY_DATE, "topicDestination8");

AppUtli.java:

public static void pushToTopic(KStream<String, String> sourceTopic, HashMap<String, String> hmap, String destTopicName) {
    sourceTopic.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {
            ArrayList<String> keywords = new ArrayList<String>();
            try {
                JSONObject send = new JSONObject();
                JSONObject received = processJSON(new JSONObject(value), destTopicName);

                boolean valid_json = true;
                for(String key: hmap.keySet()) {
                    if (received.has(hmap.get(key))) {
                        send.put(key, received.get(hmap.get(key)));
                    }
                    else {
                        valid_json = false;
                    }
                }   
                if (valid_json) {
                    keywords.add(send.toString());  
                }
            } catch (Exception e) {
                System.err.println("Unable to convert to json");
                e.printStackTrace();
            }

            return keywords;
        }
    }).to(destTopicName);
}

日志来自哪里:

所以日志来自在线连续流。 python 作业获取基本上是 URL 的日志,并将它们发送到 pre-source-topic。然后在流应用程序中,我从该主题创建流并点击那些 URL,然后返回我推送到 topicSource 的 json 日志。

我花了很多时间试图解决这个问题。我不知道出了什么问题,也不知道为什么它不处理所有日志。请帮我解决这个问题。

最佳答案

所以调试了很多才知道自己探索的方向错了,很简单的消费者比生产者慢的情况。生产者继续写关于主题的新记录,并且由于消息在流处理后被消费,消费者显然很慢。只需增加主题分区并启动具有相同应用程序 ID 的多个应用程序实例即可达到目的。

关于java - Kafka Streams 应用程序在 docker 容器中的奇怪行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50604776/

相关文章:

docker - 无法在docker-swarm上 curl traefik仪表板

networking - 从 docker 容器访问主机的虚拟机

java - 频繁出现 "offset out of range"消息,分区被消费者遗弃

python - confluence-python 可以生成 avro 中的值和 string 中的键的数据吗?

python - 由于 KafkaTimeoutError,无法使用 kafka-python 从 django 应用程序向 kafka 发送消息

Java - 验证框架

java - JBehave 子场景?

java - 在 Eclipse Junit 测试运行器中挂起未捕获的运行时异常

java - 如何使 @ManyToOne 实体成为 hibernate 中类的 ID

c++ - 为什么 GDB 没有在断点处停止?