spring-boot - Spring Cloud Messaging Source未向Kafka代理发送消息

标签 spring-boot kotlin spring-kafka spring-cloud-stream

我关注的是《 Spring Microservices In Action》一书,与作者选择的格式略有不同。即,我使用的是Kotlin和Gradle,而不是Java和Maven。除此之外,我主要遵循所提供的代码。

在“消息传递”一章中,我遇到了一个问题-我无法使用我 Autowiring 到SimpleSourceBean中的Source类发布消息。

我知道一般的设置是可以的,因为创建了Kafka主题,并且在应用程序启动时,我看到了相应的日志消息。我尝试在类主体以及构造函数中显式 Autowiring 源,但是在任何情况下都没有成功

应用类别

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(Source::class)
@EnableCircuitBreaker
class OrganizationServiceApplication {

    @Bean
    @LoadBalanced
    fun getRestTemplate(): RestTemplate {
        val restTemplate = RestTemplate()
        val interceptors = restTemplate.interceptors

        interceptors.add(UserContextInterceptor())
        restTemplate.interceptors = interceptors

        return restTemplate
    }
}

fun main(args: Array<String>) {
    runApplication<OrganizationServiceApplication>(*args)
}

这是SimpleSourceBean实现:
@Component
class SimpleSourceBean {

    @Autowired
    lateinit var source: Source

    val logger = LoggerFactory.getLogger(this.javaClass)

    fun publishOrgChange(action: String, orgId: String) {
        logger.debug("Sending Kafka message $action for Organization $orgId on source ${source}")
        val change = OrganizationChangeModel(
                OrganizationChangeModel::class.java.typeName,
                action,
                orgId,
                UserContext.correlationId!!)

        logger.debug("change message: $change")

        source.output()
                .send(MessageBuilder
                        .withPayload(change)
                        .build())

        logger.debug("Sent Kafka message $action for Organization $orgId successfully")
    }
}

这是使用SimpleSourceBean将消息发送到Kafka的Service类:
@Component
class OrganizationService {

    @Autowired
    lateinit var organizationRepository: OrganizationRepository

    @Autowired
    lateinit var simpleSourceBean: SimpleSourceBean

    val logger = LoggerFactory.getLogger(OrganizationService::class.java)

    // some omissions for brevity

    @HystrixCommand(
            fallbackMethod = "fallbackUpdate",
            commandKey = "updateOrganizationCommandKey",
            threadPoolKey = "updateOrganizationThreadPool")
    fun updateOrganization(organizationId: String, organization: Organization): Organization {
        val updatedOrg = organizationRepository.save(organization)
        simpleSourceBean.publishOrgChange("UPDATE", organizationId)
        return updatedOrg
    }

    private fun fallbackUpdate(organizationId: String, organization: Organization) =
            Organization(id = "000-000-00", name = "update not saved", contactEmail = "", contactName = "", contactPhone = "")

    @HystrixCommand
    fun saveOrganization(organization: Organization): Organization {
        val orgToSave = organization.copy(id = UUID.randomUUID().toString())
        val savedOrg = organizationRepository.save(orgToSave)
        simpleSourceBean.publishOrgChange("SAVE", savedOrg.id)
        return savedOrg
    }
}

日志消息
organizationservice_1           | 2019-08-23 23:15:33.939 DEBUG 18 --- [ionThreadPool-2] S.O.events.source.SimpleSourceBean       : Sending Kafka message UPDATE for Organization e254f8c-c442-4ebe-a82a-e2fc1d1ff78a on source null
organizationservice_1           | 2019-08-23 23:15:33.940 DEBUG 18 --- [ionThreadPool-2] S.O.events.source.SimpleSourceBean       : change message: OrganizationChangeModel(type=SpringMicroservicesInAction.OrganizationService.events.source.OrganizationChangeModel, action=UPDATE, organizationId=e254f8c-c442-4ebe-a82a-e2fc1d1ff78a, correlationId=c84d288f-bfd6-4217-9026-8a45eab058e1)
organizationservice_1           | 2019-08-23 23:15:33.941 DEBUG 18 --- [ionThreadPool-2] o.s.c.s.m.DirectWithAttributesChannel    : preSend on channel 'output', message: GenericMessage [payload=OrganizationChangeModel(type=SpringMicroservicesInAction.OrganizationService.events.source.OrganizationChangeModel, action=UPDATE, organizationId=e254f8c-c442-4ebe-a82a-e2fc1d1ff78a, correlationId=c84d288f-bfd6-4217-9026-8a45eab058e1), headers={id=05799740-f8cf-85f8-54f8-74fce2679909, timestamp=1566602133941}]
organizationservice_1           | 2019-08-23 23:15:33.945 DEBUG 18 --- [ionThreadPool-2] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@38675bb5 received message: GenericMessage [payload=byte[224], headers={contentType=application/json, id=64e1e8f1-45f4-b5e6-91d7-c2df28b3d6cc, timestamp=1566602133943}]
organizationservice_1           | 2019-08-23 23:15:33.946 DEBUG 18 --- [ionThreadPool-2] nder$ProducerConfigurationMessageHandler : org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@763a88a received message: GenericMessage [payload=byte[224], headers={contentType=application/json, id=7be5d188-5309-cba9-8297-74431c410152, timestamp=1566602133945}]

没有进一步的消息记录,其中包括SimpleSourceBEan的最终DEBUG日志语句

检查Kafka容器内部是否有关于'orgChangeTopic'主题的消息,该消息为空:
root@99442804288f:/opt/kafka_2.11-0.10.1.0/bin# ./kafka-console-consumer.sh --from-beginning --topic orgChangeTopic --bootstrap-server 0.0.0.0:9092
Processed a total of 0 messages

任何指向我的问题所在的指针都将不胜感激

编辑:

添加application.yml:
spring:
  cloud:
    stream:
      bindings:
        output:
          destination:  orgChangeTopic
          content-type: application/json
      kafka:
        binder:
          zkNodes: "http://kafkaserver:2181"
          brokers: "http://kafkaserver:9092"

// omitting some irrelevant config

logging:
  level:
    org.apache.kafka: DEBUG
    org.springframework.cloud: DEBUG
    org.springframework.web: WARN
    springmicroservicesinaction.organizationservice: DEBUG

具有相关依赖项的build.gradle文件的摘录:
dependencies {
    // kotlin, spring boot, etc
    implementation("org.springframework.cloud:spring-cloud-stream:2.2.0.RELEASE")
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:2.2.0.RELEASE")
}

最佳答案

您还需要显示您的应用程序属性。您的kafka版本很旧; 0.10.x.x不支持 header 。您正在使用哪个版本的spring-cloud-stream?除非将headerMode设置为none,否则现代版本要求Kafka支持 header (0.11或更高版本-当前版本为2.3)。

就是说,如果我们尝试将 header 发送到不支持 header 的版本,我希望看到一条错误消息。

implementation("org.springframework.cloud:spring-cloud-stream:2.2.0.RELEASE")



另请注意,使用现代版本,您不再需要
zkNodes: "http://kafkaserver:2181"

2.2.0使用的kafka-clients版本直接通过Kafka代理支持主题调配,而我们不再需要连接到zookeeper。

关于spring-boot - Spring Cloud Messaging Source未向Kafka代理发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57634256/

相关文章:

java - spring boot,默认身份验证失败处理程序如何工作

spring-mvc - 如何将 Spring Boot 部署到 Cloud Foundry?

css - Spring 启动 : Adding static content to web application

安卓 Kotlin : Required Context but found String

gradle - 在 build.gradle 中找不到方法 kaptKotlin

spring-boot - Spring Redis 缓存中的 ClassCastException

android - RecyclerView Observer, fragment 未更新数据

spring-boot - Spring Boot + Kafka + Kerberos 配置

java - 如何通过application.properties激活KafkaBatchListener (factory.setBatchListener(true))

spring-kafka(非集成)消费者不消费消息