java - 如何在 spring-integration-kafka 2.1.0.RELEASE 和 Kafka 0.10.0 中为不同的主题配置不同的生产者?

标签 java spring spring-integration spring-kafka

将 Kafka 从 0.9.0 升级到 0.10.0 时,在为不同的主题配置不同的生产者时遇到问题。下面给出的 XML 配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<int:publish-subscribe-channel id="inputToKafka" />

	<!-- Producer Config -->

	<int-kafka:outbound-channel-adapter
		id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification"
		auto-startup="true" channel="inputToKafka">
		<int-kafka:request-handler-advice-chain>
			<bean
				class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
		</int-kafka:request-handler-advice-chain>
	</int-kafka:outbound-channel-adapter>
	
	<int-kafka:outbound-channel-adapter
		id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account"
		auto-startup="true" channel="inputToKafka">
		<int-kafka:request-handler-advice-chain>
			<bean
				class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
		</int-kafka:request-handler-advice-chain>
	</int-kafka:outbound-channel-adapter>
	
	<bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="retries" value="0" />
						<entry key="batch.size" value="16384" />						
						<entry key="linger.ms" value="0" />
						<entry key="buffer.memory" value="33554432" />
						<entry key="key.serializer"
							value="org.apache.kafka.common.serialization.StringSerializer" />
						<entry key="value.serializer"
							value="common.serializer.FcmNotificationVoSerializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
	</bean>

	<bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="retries" value="0" />
						<entry key="batch.size" value="16384" />
						<entry key="buffer.memory" value="33554432" />	
						<entry key="linger.ms" value="0" />
						<entry key="key.serializer"
							value="org.apache.kafka.common.serialization.StringSerializer" />
						<entry key="value.serializer"
							value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
	</bean>

	<int-kafka:message-driven-channel-adapter
		id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer"
		auto-startup="true" phase="100" send-timeout="5000"
		channel="ip-chanel-trigger-fcm-notification" mode="record"
		message-converter="messageConverter" />

	<int-kafka:message-driven-channel-adapter
		id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer"
		auto-startup="true" phase="100" send-timeout="5000"
		channel="ip-chanel-sync-microsoft-account" mode="record"
		message-converter="messageConverter" />
		
	<bean id="messageConverter"
		class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

	<!-- Consumer Config -->
	<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
		ref="fcmNotificationConsumer">
	</int:service-activator>
	
	<int:service-activator input-channel="ip-chanel-sync-microsoft-account"
		ref="syncMicrosoftAccountConsumer">
	</int:service-activator>
	
	<bean id="fcmContainer"
		class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="enable.auto.commit" value="true" />
						<entry key="auto.commit.interval.ms" value="100" />
						<entry key="session.timeout.ms" value="15000" />
						<entry key="group.id" value="trigger-fcm-notification" />
						<entry key="key.deserializer"
							value="org.apache.kafka.common.serialization.StringDeserializer" />
						<entry key="value.deserializer"
							value="common.deserializer.FcmNotificationVoDeserializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
		<constructor-arg>
			<bean class="org.springframework.kafka.listener.config.ContainerProperties">
				<constructor-arg name="topics" value="trigger-fcm-notification" />
			</bean>
		</constructor-arg>
	</bean>
		
	<bean id="microsoftAccountSyncContainer"
		class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
		<constructor-arg>
			<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
				<constructor-arg>
					<map>
						<entry key="bootstrap.servers" value="localhost:9092" />
						<entry key="enable.auto.commit" value="true" />
						<entry key="auto.commit.interval.ms" value="100" />
						<entry key="session.timeout.ms" value="15000" />
						<entry key="group.id" value="sync-microsoft-account" />
						<entry key="key.deserializer"
							value="org.apache.kafka.common.serialization.StringDeserializer" />
						<entry key="value.deserializer"
							value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" />
					</map>
				</constructor-arg>
			</bean>
		</constructor-arg>
		<constructor-arg>
			<bean class="org.springframework.kafka.listener.config.ContainerProperties">
				<constructor-arg name="topics" value="sync-microsoft-account" />
			</bean>
		</constructor-arg>
	</bean>

</beans>

分别发布 2 个主题时出现错误。堆栈轨迹如下

(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$1.execute(AbstractRequestHandlerAdvice.java:75)
	at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
	at com.sun.proxy.$Proxy52.handleMessage(Unknown Source)
	at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236)
	at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
	at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191)
	at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366)
	at service.TenantDocumentsController.update(TenantDocumentsController.java:277)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)

定义了两个独立的 Serializer 和 Deserializer 类。但是它如何在内部引用其他类? 我是否遗漏了任何配置?

最佳答案

既然你发给了Kafka,就没有Deserializer的主题。根据您的 StackTrace,您执行了一些 REST 服务,该服务将 GcmNotificationVo 对象发送到 inputToKafka

此时,第二个订阅者无法使用 common.serializer.MicrosoftAccountSyncRequestVoSerializer 执行该对象的 Kafka 序列化。

也许您的想法是使用 masOutboundChannelAdapter 进行不同的操作?因此,一个新的单独的 channel

关于java - 如何在 spring-integration-kafka 2.1.0.RELEASE 和 Kafka 0.10.0 中为不同的主题配置不同的生产者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40157795/

相关文章:

java - 使私有(private)字段在继承的子类中可见

java - 清除 ArrayList 是否保留其容量

java - 运行单元测试时奇怪的 java 外键约束失败

java - spring 集成中的运行时可配置动态路由

Spring 集成: stop a flow based on a given a condition

java - 如何将 Java 应用程序从 Tomcat 7/8/9 迁移到 Tomcat 10

java - Java中的String to Int - 可能是坏数据,需要避免异常

java - Spring - 是否有可能获取所有包,并在@ComponentScan 中注册?

DelegatingFilterProxy 上的 Spring 和 @Autowired

使用注释的 Spring Integration 和 Spring Batch [Spring-Batch-Integration]