java - 根据配置向不同的Kafka主题发送消息

标签 java apache-kafka

我想根据配置将数据发送到不同的 Kafka 消息:

ResponseFactory processingPeply = null;

        switch(endpointType)
        {
            case "email":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "sms":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
            case "network":
                ProducerRecord<String, Object> record = new ProducerRecord<>("tp-network.request", tf);
                RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(record);
                SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);

                processingPeply = (ResponseFactory) consumerRecord.value();
              break;
              
            default:
                processingPeply = ResponseFactory.builder().status("error").build();
        } 
我目前得到:
  • 变量“记录”已在范围
  • 中定义
  • 变量“sendResult”已在范围
  • 中定义
  • 变量“consumerRecord”已在范围
  • 中定义

    您知道我如何以更好的方式重新设计代码以便解决问题吗?

    最佳答案

    在这里建议 4 种可能的方法,以避免核心代码中的一些开关块并尊重 DRY 之一。的原则,即避免重复代码。 (DRY 代表了一个比不重复代码更大的概念)。

    1- GeneralHandler 和端点类型子项
    这里有点像分层类的树,不同的端点是抽象/一般父亲的扩展。

                          [GeneralKafkaHandler] - core/common logic
                   _______________ | ________________
                  |                |                |
                  v                v                v
             {SmsHandler}    {EmailHandler}   {NetworkHandler}  -- specific params/methods
    
    例如,getTopic()getFuture()可能是 abstract在父亲身上,由每个 child 用自己的逻辑实现。另一种选择是制作 getKafkaTemplate()另一种抽象方法(在 getFuture()getKafkaTemplate() 之间选择)。这是层次结构的简化,从构造函数中检索主题。
    Abstract father
    abstract class GeneralKafkaHandler 
    {
       public abstract RequestReplyFuture<String, Object, Object> 
                       getFuture(ProducerRecord<>r);
       public abstract String getName();
    
       protected String topic;
       protected int id;
       ResponseFactory processingPeply = null;
    
       public GeneralKafkaHandler(String topic, int id) 
       {
           this.topic = topic; 
           this.id = id;
       }
    
       public void handle(Object tf) //the main/common logic is implemented here
       {
           ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
           RequestReplyFuture<String, Object, Object> rf = getFuture(record);  
           SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
           ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
           processingPeply = (ResponseFactory) consumerRecord.value();
       }
    
       //...
    }
    
    SmsHandler
    class SmsKafkaHandler extends GeneralKafkaHandler 
    {
       //Sms specific variables, methods,..
        
       public SmsKafkaHandler(String topic, int id) 
       {
          super(topic, id);
          //sms code
       }
    
       @Override
       public String getName() 
       {
          return "SMSHandler_"+topic+"_"+id);
       }
    
       @Override
       public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
       {
          //sms code
          return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
       }
    
       //...
    }
    
    Main (只是一个例子)
    Map<String, GeneralKafkaHandler> handlerMap = new HashMap<>();
    handlerMap.put("sms", new SmsKafkaHandler("tp-sms.request",1));
    handlerMap.put("smsplus", new SmsKafkaHandler("tp-sms-plus.request",2));
    handlerMap.put("email", new EmailKafkaHandler("tp-email.request",1));
    //...
    
    handlerMap.get(endpointType.toLowerCase()).handle(tf);
    
    这里有不同的选择;比如sendAndReceive也是所有类型的通用方法,所以getFuture()只需一个 getTemplate() 即可更改方法。这里有很多选择。
    如果您需要/希望更多地管理每个端点,这种方法将是一个好主意;如果您认为不同的管理是值得的,或者将来会值得,您可以考虑;由于核心机制是相同的,不同的扩展可以让你快速实现不同的端点类型。

    2- 自定义实体
    本质上,端点类型只有 2 个不同的元素:
  • Topic
  • ReplyingKafkaTemplate

  • 您可以将它们包装成一个对象。例如:
    public class TopicEntity
    {
      public final String topic;
      public final ReplyingKafkaTemplate<String,Object,Object> template;
    
      public TopicEntity(String topic, ReplyingKafkaTemplate<String,Object,Object> template)
      {
         this.topic = topic;
         this.template = template;
      }    
    }
    
    那么你可以在不修改当前代码的情况下获得这个(这里我假设你的模板已经初始化):
    TopicEntity smsE = new TopicEntity("tp-sms.request",
                                       processingTransactionSmsReplyKafkaTemplate);
    TopicEntity mailE = new TopicEntity("tp-email.request",
                                       processingTransactionEmailReplyKafkaTemplate);
    
    Map<String, TopicEntity> handlerMap = new HashMap<>();
    handlerMap.put("sms", smsE);
    handlerMap.put("email",mailE);
    //...
    
    TopicEntity te = handlerMap.get(endpointType.toLowerCase()); 
    //Based on endpoint
    ProducerRecord<String, Object> record = new ProducerRecord<>(te.topic, tf);
    RequestReplyFuture<String, Object, Object> rf = te.template.sendAndReceive(record);
    //Common regardless of endpoint
    SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
    ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
    processingPeply = (ResponseFactory) consumerRecord.value();
    
    非常简单,也避免了重复代码;该实体还允许您为每个端点定义特定的特征。

    3- setter/getter 方法
    更简单的方法,只是为了让主代码看起来更干净。
    ProducerRecord<String, Object> record = new ProducerRecord<>(getTopic(endpointType),tf);
    RequestReplyFuture<String, Object, Object> replyFuture = getFuture(endpointType,record);
    /*rest of the code here (common regardless type)*/
    
    和 setter/getter :
    String getTopic(String e)
    {
       switch(e.toLowerCase())
       {
          case "email"  : return "tp-email.request"; 
          case "sms"    : return "tp-sms.request";
          case "network": return "tp-network.request";
          default : /*handle error*/ return null; 
                    /*kafka's response - "topic cannot be null");*/
        }
    }
    
    RequestReplyFuture<String, Object, Object> getFuture(String e, ProducerRecord<> r)
    {
      switch(e.toLowerCase())
      {
         case "email": 
              return processingTransactionEmailReplyKafkaTemplate.sendAndReceive(r);
         case "sms" :
               return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
         case "network": 
               return processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(r);
         default : /*handle error*/ return null;
      }            /*this one should never be executed*/
    }
    

    4- 单 Volley
    好吧,也许这是一种更简单的方法……这将是方法 3 和方法 4 之间的斗争。
    ReplyingKafkaTemplate template;
    String topic;
    //...
    
    void setParameters(String e)
    {
      switch(e.toLowerCase())
      {
        case "email"  : 
              topic = "tp-email.request"; 
              template = processingTransactionEmailReplyKafkaTemplate;
              break;         
        case "sms"    :       
              topic = "tp-sms.request"; 
              template = processingTransactionSmsReplyKafkaTemplate;
              break;         
         //...
       }
    }
    //...
    
    setParameters(endpointType);
    
    ProducerRecord<String, Object> r= new ProducerRecord<>(topic,tf);
    RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(r);
    SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
    ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
    processingPeply = (ResponseFactory) consumerRecord.value();
    

    1.a)- Spring 和 GeneralHandler
    剧透:我不知道 sh#!关于 Spring,所以这可能完全不正确。
    从我读过的here ,抽象类不需要任何注解,只需要 child 可能访问的字段@Autowired .
    abstract class GeneralKafkaHandler 
    {
       public abstract RequestReplyFuture<String, Object, Object> 
                       getFuture(ProducerRecord<>r);
       public abstract String getName();
    
       @Autowired
       protected String topic;
       @Autowired
       protected int id;
    
       ResponseFactory processingPeply = null;
    
       public GeneralKafkaHandler(String topic, int id) 
       {
           this.topic = topic; 
           this.id = id;
       }
    
       public void handle(Object tf) //the main/common logic is implemented here
       {
           ProducerRecord<String, Object> record = new ProducerRecord<>(topic, tf);
           RequestReplyFuture<String, Object, Object> rf = getFuture(record);  
           SendResult<String, Object> sr = rf.getSendFuture().get(10, TimeUnit.SECONDS);
           ConsumerRecord<String, Object> consumerRecord = rf.get(10,TimeUnit.SECONDS);
           processingPeply = (ResponseFactory) consumerRecord.value();
       }
    
       //...
    }
    
    而 children 应该有@Component注释,以及 @Autowired在构造函数中;我不太确定最后一个,因为我看到的示例还包括也在 child.h 中定义的字段。
    @Component
    class SmsKafkaHandler extends GeneralKafkaHandler 
    {
       //Sms specific variables, methods,..
        
       @Autowired  //not sure about this..
       public SmsKafkaHandler(String topic, int id) 
       {
          super(topic, id);
          //sms code
       }
    
       @Override
       public String getName() 
       {
          return "SMSHandler_"+topic+"_"+id);
       }
    
       @Override
       public RequestReplyFuture<String, Object, Object> getFuture(ProducerRecord<> r)
       {
          //sms code
          return processingTransactionSmsReplyKafkaTemplate.sendAndReceive(r);
       }
    
       //...
    }
    
    真的,我不知道我在说什么关于这个 Spring 解决方案,我什至不知道那些注释是什么,看电脑的狗的模因代表了此时的我。所以请慎重对待...

    DRY is for losers

    关于java - 根据配置向不同的Kafka主题发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66129391/

    相关文章:

    java - 如何取回 Kafka 生产者和消费者配置(Java API)?

    apache-kafka - kafka 消息的 Thrift 序列化 - 每个结构的单个主题

    apache-kafka - 为 Kafka 中的消息添加类型?

    java - Lucene Java 在 TextField 上排序 - 按字母顺序

    java - 如何修复我的音频? (优先)

    java - 使用 Spring Data 和 MongoDB 的单元/集成测试无法模拟存储库

    Java:如何编写指定父类(super class)和接口(interface)的强制转换?

    java - 列出当前组配置并更新配置

    java - Elasticsearch 中的精确匹配

    java - Kafka 主题丢失消息