所以我正在评估Kafka。在我们的用例中,必须创建包含从一个事件到下一个事件的“已用时间”的新主题,本质上是因为传感器将向 Kafka 报告“开”或“关”。因此,拥有时间戳、传感器名称和状态,创建具有“开”和“关”状态持续时间的新主题。
我的数据是这样的:
{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}
得到结果
{ 2019:02:15 00:30:00, sensor1, off, 30sec }.
本质上必须组合多个传感器的状态来确定机器的组合状态。工厂中有数百个甚至数千个传感器
最佳答案
这在 Kafka Streams 中很容易,所以我会选择 2。
首先,您必须正确地对输入数据进行建模。您的示例使用本地时间,因此无法计算两个时间戳之间的持续时间。 Use something喜欢 epoch time .
从源数据模型开始,例如
interface SensorState {
String getId();
Instant getTime();
State getState();
enum State {
OFF,
ON
}
}
和一个目标
interface SensorStateWithDurationX {
SensorState getEvent();
Duration getDuration();
}
现在您已经定义了输入和输出流(但请参阅“Data Types and Serialization ”),您只需要通过简单地定义 Applying processors and transformers 来转换值(“
ValueTransformer
”) .它必须做两件事:
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
KeyValueStore<String, SensorState> store;
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
}
public SensorStateWithDuration transform(SensorState sensorState) {
// Nothing to do
if (sensorState == null) {
return null;
}
// Check for the previous state, update if necessary
var oldState = checkAndUpdateSensorState(sensorState);
// When we have historical data, return duration so far. Otherwise return null
return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
}
public void close() {}
/**
* Checks the state store for historical state based on sensor ID and updates it, if necessary.
*
* @param sensorState The new sensor state
* @return The old sensor state
*/
Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
// The Sensor ID is our index
var index = sensorState.getId();
// Get the historical state (might be null)
var oldState = store.get(index);
if (neetToUpdate(oldState, sensorState)) {
// Update the state store to the new state
store.put(index, sensorState);
}
return Optional.ofNullable(oldState);
}
/**
* Check if we need to update the state in the state store.
*
* <p>Either we have no historical data, or the state has changed.
*
* @param oldState The old sensor state
* @param sensorState The new sensor state
* @return Flag whether we need to update
*/
boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
return oldState == null || oldState.getState() != sensorState.getState();
}
/**
* Wrap the old state with a duration how log it lasted.
*
* @param oldState The state of the sensor so far
* @param sensorState The new state of the sensor
* @return Wrapped old state with duration
*/
SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
var duration = Duration.between(oldState.getTime(), sensorState.getTime());
return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
}
}
将所有内容(“Connecting Processors and State Stores ”)放在一个简单的 Topology 中:
var builder = new StreamsBuilder();
// Our state store
var storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("SensorStates"),
Serdes.String(),
storeSerde);
// Register the store builder
builder.addStateStore(storeBuilder);
builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
.transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
.to("result-topic", Produced.with(Serdes.String(), resultSerde));
var topology = builder.build();
完整的申请位于 github.com/melsicon/kafka-sensors .
关于apache-kafka - Kafka时差最后两条记录,KSQL还是其他?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54713760/