apache-kafka - Kafka Streams - 内部主题的 ACL

标签 apache-kafka apache-kafka-streams

我正在尝试设置一个安全的 Kafka 集群,但在 ACL 方面遇到了一些困难。

Kafka Streams 的 Confluence 安全指南 ( https://docs.confluent.io/current/streams/developer-guide/security.html ) 只是指出必须将 Cluster Create ACL 提供给主体......但它没有说明如何实际处理内部主题。

通过研究和实验,我确定(对于 Kafka 版本 1.0.0):

  1. 通配符不能与 ACL 中的主题名称文本一起使用。例如,由于所有内部主题都以应用程序 id 为前缀,因此我的第一个想法是将 acl 应用于匹配“-*”的主题。这是行不通的。
  2. Streams API 创建的主题不会自动获得创建者的读/写访问权限。

内部主题的确切名称是否可预测且一致?换句话说,如果我在开发服务器上运行应用程序,运行时会在生产服务器上创建完全相同的主题吗?如果是这样,那么我可以在部署之前添加从 dev 派生的 ACL。如果没有,应该如何添加ACL?

最佳答案

Are the exact names of the internal topics predictable and consistent? In other words, if I run my application on a dev server, will the exact same topics be created on the production server when run?

是的,每次运行您都会获得完全相同的主题名称。 DSL 生成处理器名称 with a function看起来像这样:

public String newProcessorName(final String prefix) {
    return prefix + String.format("%010d", index.getAndIncrement());
}

(其中index只是一个递增的整数)。然后,这些处理器名称将用于 create repartition topics使用如下所示的函数(参数 name 是如上生成的处理器名称):

static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
                                                final Serde<K1> keySerde,
                                                final Serde<V1> valSerde,
                                                final String topicNamePrefix,
                                                final String name) {
    Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
    Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
    Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
    Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
    String baseName = topicNamePrefix != null ? topicNamePrefix : name;

    String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
    String sinkName = builder.newProcessorName(SINK_NAME);
    String filterName = builder.newProcessorName(FILTER_NAME);
    String sourceName = builder.newProcessorName(SOURCE_NAME);

    builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
    builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
        @Override
        public boolean test(final K1 key, final V1 value) {
            return key != null;
        }
    }, false), name);

    builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
        null, filterName);
    builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
        keyDeserializer, valDeserializer, repartitionTopic);

    return sourceName;
}

如果您不更改拓扑(例如,如果不更改其构建顺序等),则无论拓扑在何处构建,您都会得到相同的结果(假设您使用相同的拓扑) Kafka Streams 版本)。

If so, then I can just add ACLs derived from dev before deploying. If not, how should the ACLs be added?

我没有使用过 ACL,但我想既然这些只是常规主题,那么是的,您可以对它们应用 ACL。 security guide确实提到:

When applications are run against a secured Kafka cluster, the principal running the application must have the ACL --cluster --operation Create set so that the application has the permissions to create internal topics.

不过,我自己也一直在想这个问题,所以如果我错了,我猜 Confluence 的某个人会纠正我。

关于apache-kafka - Kafka Streams - 内部主题的 ACL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48629781/

相关文章:

ubuntu - kafka启动错误ubuntu类加载失败

python - 有时一个新的消费者群体不起作用

apache-kafka - 识别Kafka消费者中哪个主题有记录

apache-kafka - Kafka 高级消费者 error_code=15

java - kafka-streams - TopologyBuilder/KStreamBuilder 对象是否可重用?

java - 有什么办法可以让kafka流暂停一段时间然后再恢复吗?

apache-kafka - 卡夫卡论公共(public)交通

apache-kafka - 在Processor Api中,当 `DefaultStreamPartitioner`函数中未指定分区器时,是否会应用 `addSink`?

java - Apache 卡夫卡流: How to switch from using In-memory Key Value Store to Persistent Key Value Store?

apache-kafka - 如何发现并过滤掉Kafka Streams中的重复记录