java - Kafka SaslAuthenticationException 在 SASL_SSL 协议(protocol)的临时基础上发生

标签 java spring-security java-8 apache-kafka kerberos

我正在运行一些 java 8 Kafka 应用程序,其中一些是 Kafka 流,其他是普通的生产者/消费者。

对于它们中的每一个,都不存在功能问题,它们在大多数时间都运行良好。

但是,对于它们中的每一个,我都会临时出现 SaslAuthenticationException。由于它们每隔几周左右发生一次,我不确定如何复制/推断根本原因:

错误:

Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

堆栈跟踪:

Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:441) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:135) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:244) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:194) ~[kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:289) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:280) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:215) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920) [kafka-clients-2.0.1.jar!/:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508) [kafka-clients-2.0.1.jar!/:na]

这就是我向 Kafka 提供 JAAS Kerberos 身份验证的方式(在我的配置文件中,我提供了 kdc、realm、keytab、principal 等信息):

@Value("${kafka.sasl.kerberos.kdc}")
private String kdc;
@Value("${kafka.sasl.kerberos.realm}")
private String realm;
@Value("${kafka.sasl.kerberos.keytab}")
private Resource keytab;
@Value("${kafka.sasl.kerberos.principal}")
private String principal;

@Bean
public InMemoryConfiguration kafkaOpts() throws IOException {

    System.setProperty("java.security.krb5.kdc", kdc);
    System.setProperty("java.security.krb5.realm", realm);

    Map<String, Object> options = new HashMap<>();
    options.put("keyTab", copyResourceToTempFile(keytab, ".keytab").toString());
    options.put("principal", principal);
    options.put("useKeyTab","true");
    options.put("storeKey","true");
    AppConfigurationEntry kafkaClientConfig = new AppConfigurationEntry(
            "com.sun.security.auth.module.Krb5LoginModule", LoginModuleControlFlag.REQUIRED, options);
    Map<String, AppConfigurationEntry[]> jaasConfigEntries = new HashMap<>();
    jaasConfigEntries.put("KafkaClient", new AppConfigurationEntry[] {kafkaClientConfig});
    InMemoryConfiguration jaasConfig = new InMemoryConfiguration(jaasConfigEntries);
    javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
    return jaasConfig;
}

public static Path copyResourceToTempFile(Resource resource, String extension) {

    try (InputStream in = resource.getInputStream()) {
        Path tempFile = Files.createTempFile("spring-boot-", extension);
        Files.copy(in, tempFile, StandardCopyOption.REPLACE_EXISTING);
        return tempFile;
    } catch (IOException e) {
        log.error("Error creating resource to file",e);
        return null;
    }

}

最佳答案

我们也面临同样的问题。连接 kafka 后,突然失去连接,应用程序重试登录,但失败。 但是我们使用以下配置更新 jaas auth 配置文件:

com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;

它非常适合我。

关于java - Kafka SaslAuthenticationException 在 SASL_SSL 协议(protocol)的临时基础上发生,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55900214/

相关文章:

java - ReplaceAll 和正则表达式组‽

java - 如何让hbox适合列标题?

java - 使用 Spring Social、Spring security 登录后重定向到原始 URL?

java - Spring MVC Hibernate Spring Security 错误

java - jackson VS。 Gson

java - 使用正则表达式分割字符串

java - Spring Security Role Hierarchy 无法使用 Java Config

Java 8 获取列表连续数字的功能方式

java - 流式传输 map 列表并收集特定 key

Java 8 : How to parse DayOfWeek with locale set by device