apache-kafka - Kafka时差最后两条记录,KSQL还是其他?

标签 apache-kafka apache-kafka-streams ksqldb

所以我正在评估Kafka。在我们的用例中,必须创建包含从一个事件到下一个事件的“已用时间”的新主题,本质上是因为传感器将向 Kafka 报告“开”或“关”。因此,拥有时间戳、传感器名称和状态,创建具有“开”和“关”状态持续时间的新主题。

  • 这在 KSQL 中可行吗?
  • 或者真的应该让消费者或流处理器来解决这个问题?

  • 我的数据是这样的:
    { 2019:02:15 00:00:00, sensor1, off}
    { 2019:02:15 00:00:30, sensor1, on} 
    

    得到结果
    { 2019:02:15 00:30:00, sensor1, off, 30sec }. 
    

    本质上必须组合多个传感器的状态来确定机器的组合状态。工厂中有数百个甚至数千个传感器

    最佳答案

    这在 Kafka Streams 中很容易,所以我会选择 2。

    首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,因此无法计算两个时间戳之间的持续时间。 Use something喜欢 epoch time .

    从源数据模型开始,例如

    interface SensorState {
      String getId();
      Instant getTime();
      State getState();
      enum State {
        OFF,
        ON
      }
    }
    

    和一个目标

    interface SensorStateWithDurationX {
      SensorState getEvent();
      Duration getDuration();
    }
    

    现在您已经定义了输入和输出流(但请参阅“Data Types and Serialization ”),您只需要通过简单地定义 Applying processors and transformers 来转换值(“ ValueTransformer ”) .

    它必须做两件事:
  • 检查传感器历史数据的状态存储,并在必要时使用新数据更新它
  • 当历史数据可用时,计算时间戳之间的差异并将数据连同计算的持续时间一起发出

  • class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
      KeyValueStore<String, SensorState> store;
    
      @SuppressWarnings("unchecked")
      public void init(ProcessorContext context) {
        this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
      }
    
      public SensorStateWithDuration transform(SensorState sensorState) {
        // Nothing to do
        if (sensorState == null) {
          return null;
        }
    
        // Check for the previous state, update if necessary
        var oldState = checkAndUpdateSensorState(sensorState);
    
        // When we have historical data, return duration so far. Otherwise return null
        return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
      }
    
      public void close() {}
    
      /**
       * Checks the state store for historical state based on sensor ID and updates it, if necessary.
       *
       * @param sensorState The new sensor state
       * @return The old sensor state
       */
      Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
        // The Sensor ID is our index
        var index = sensorState.getId();
    
        // Get the historical state (might be null)
        var oldState = store.get(index);
        if (neetToUpdate(oldState, sensorState)) {
          // Update the state store to the new state
          store.put(index, sensorState);
        }
        return Optional.ofNullable(oldState);
      }
    
      /**
       * Check if we need to update the state in the state store.
       *
       * <p>Either we have no historical data, or the state has changed.
       *
       * @param oldState The old sensor state
       * @param sensorState The new sensor state
       * @return Flag whether we need to update
       */
      boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
        return oldState == null || oldState.getState() != sensorState.getState();
      }
    
      /**
       * Wrap the old state with a duration how log it lasted.
       *
       * @param oldState The state of the sensor so far
       * @param sensorState The new state of the sensor
       * @return Wrapped old state with duration
       */
      SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
        var duration = Duration.between(oldState.getTime(), sensorState.getTime());
        return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
      }
    }
    

    将所有内容(“Connecting Processors and State Stores ”)放在一个简单的 Topology 中:

    var builder = new StreamsBuilder();
    
    // Our state store
    var storeBuilder =
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("SensorStates"),
            Serdes.String(),
            storeSerde);
    
    // Register the store builder
    builder.addStateStore(storeBuilder);
    
    builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
        .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
        .to("result-topic", Produced.with(Serdes.String(), resultSerde));
    
    var topology = builder.build();
    

    完整的申请位于 github.com/melsicon/kafka-sensors .

    关于apache-kafka - Kafka时差最后两条记录,KSQL还是其他?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54713760/

    相关文章:

    java - 卡夫卡消费者。 commitSync 与 commitAsync

    apache-kafka - Kafka Streams - 聚合并加入用户的地址

    apache-kafka - Kafka KSQL Re分区和rekey问题

    java - Kafka流聚合功能是否将数据同时存储在ram和topic中?

    apache-kafka - KafkaStreams处理保证exactly_once和exactly_once_beta的区别

    java - 如何连接两个 Kafka 流并在具有 Avro 值的主题中生成结果

    apache-kafka - KSQL创建具有多列聚合的表

    apache-kafka - kafka 安装失败,连接失败 : [Errno 111] Connection refused to 6667

    java - 我们可以在全局状态存储恢复期间调用处理器吗?

    java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed