spring - Spring Cloud Stream中的错误处理-Kafka Binder

标签 spring error-handling spring-cloud-stream

我正在尝试使用Kafka Binder为SCS实现错误处理功能,并且目前遇到使错误进入错误主题的问题。

1).yml文件中的错误是否有任何特定说明,例如组或内容类型

2)当味精流入Kafka主题时,我该如何重试?

谢谢。

详细信息如下:

1)生产者每隔几秒钟生成JSON:-

@EnableBinding(Source.class)
public class LoggingProducer {

      @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "7000", maxMessagesPerPoll = "1"))
        public LoggingObject pumpSource() {

          LoggingObject loggingObject = new LoggingObject();

          String loggingNumber = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

          System.out.println(loggingNumber);

          loggingObject.setLoggingId(loggingNumber);
          Random rand = new Random();
          int randint = rand.nextInt(100000);

          if      (randint % 3 == 0) {
              loggingObject.setLoggingMessageStatus("SENT");
          }
          else if (randint % 4 == 0) {
              loggingObject.setLoggingMessageStatus("REVIEW");
          }
          else {
              loggingObject.setLoggingMessageStatus("ERROR");
          }

            System.out.println(loggingObject.toString());

            return loggingObject;
        }   
}

2)application.yml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: Processortopic
          group: myGroup
          producer:
            header-mode: embeddedHeaders
          content-type: application/json

3)消费者应用:
@EnableBinding(Sink.class)
@Configuration
public class LoggingObjectProcessor {


    @StreamListener(Sink.INPUT) // destination name 'input.myGroup'
    public void handle(LoggingObject loggingObject) {
        System.out.println("In the Consumer---->>>>><<<<<<");
        throw new RuntimeException("BOOM!");
    }

    /*@ServiceActivator(inputChannel = "Sourcetopic.myGroup.errors")
    public void error(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }*/

    @StreamListener("errorChannel")
    public void errorGlobal(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }


}

4)消费者Application.yml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: Processortopic
          group: myGroup
          consumer:
            header-mode: embeddedHeaders
          content-type: application/json
        error:
          destination: myErrors
          content-type: application/json

5)LoggingObject POJO
public class LoggingObject {

    private String loggingId;
    private String loggingMessageStatus;



                public String getLoggingId() {
                    return loggingId;
                }
                public void setLoggingId(String loggingId) {
                    this.loggingId = loggingId;
                }
                public LoggingObject() {

                }
                public String getLoggingMessageStatus() {
                    return loggingMessageStatus;
                }
                public void setLoggingMessageStatus(String loggingMessageStatus) {
                    this.loggingMessageStatus = loggingMessageStatus;
                }
}

6)这是POM
 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

更新的消费者应用程序和日志
@ServiceActivator(inputChannel = "Processortopic.myGroup.errors")
public void error(Message<?> message) {
    System.out.println("Handling ERROR: " + message);
}

日志:-
2018-05-23 10:21:36.178  INFO 76939 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
Handling ERROR: ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.LoggingObjectProcessor#handle[1 args]; nested exception is java.lang.RuntimeException: BOOM!, failedMessage=GenericMessage [payload=byte[191], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, id=feda8595-5ef6-35dd-b43f-4940a90017ba, kafka_receivedPartitionId=0, contentType=application/json;charset=UTF-8, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269, timestamp=1527088896173}], headers={kafka_data=ConsumerRecord(topic = Processortopic, partition = 0, offset = 4721, CreateTime = 1526991171269, serialized key size = -1, serialized value size = 277, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@538f1f04), id=37e72560-5e63-db90-27f3-2ff2e04e1778, timestamp=1527088896174}] for original GenericMessage [payload=byte[277], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269}]
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<

最佳答案

我马上看到的一个问题是,您正在将@StreamListener用于全局“errorChannel”。它不会起作用。如文档所述:
The use of @StreamListener annotation is intended specifically to define bindings that bridge internal channels and external destinations. Given that the destination specific error channel does NOT have an associated external destination, such channel is a prerogative of Spring Integration (SI). This means that the handler for such destination must be defined using one of the SI handler annotations (i.e., @ServiceActivator, @Transformer etc.).
请修正并​​告知我们。

关于spring - Spring Cloud Stream中的错误处理-Kafka Binder,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50478744/

相关文章:

java - Spring Security - BytesEncryptor - 没有获得正确的加密密码

java - 将现有的 Spring 应用程序转换为 Spring-Boot

java - 用户能够访问管理员角色页面 - Spring Security

spring - 从BindingResult中的ObjectError中获取FieldErrors

java - 回调驱动的 Spring Cloud Dataflow 源应用程序

java - 通过(伪)直接调用该处理程序方法来测试 Spring Controller - 好还是坏?如何实现?

r - 抑制来自deSolve::lsoda的错误

scala - 使用尝试异常上下文

java - 延迟处理死信队列

java - 如何在不同的 RabbitMQ vhost 上设置 Spring Cloud Stream Bindings 的 Binder