wso2 - WS02 CEP Siddhi 查询

标签 wso2 complex-event-processing siddhi wso2-cep

Siddhi CEP 新手。除了 WS02 CEP 上的常规文档之外,有人可以指出一个很好的教程。

这是我们的要求。指出一些有关编写此类查询的正确方法的线索。

  • 拥有单个传感器设备通知流(IOT 应用程序)。
  • 流输入通过 REST-JSON 进行,输出也将格式化为 REST-JSON。 (希望这在 WS02 CEP 3.1 上是可能的)

所需的执行计划类型: - 如果设备通知报告传感器 1 的使用情况,则监视以查看 5 分钟内设备通知是否也报告传感器 2 的使用情况。如果找到,则生成输出流,以 REST-JSON 报告复合事件。
- 如果在早上、下午和晚上的某个时间段内未检测到此类复合事件,则在 REST-JSON 上生成警告事件流状态。 (那么如何找到没有及时发生的事件) - 如果在早上、下午和晚上的某些时间段内未找到此类复合事件,则在 REST-JSON 上报告 failure1-event-stream 状态。

这应该日复一日地工作,那么如何在 WSO2 CEP 中删除先前处理的数据。

问候, 阿米特

最佳答案

查询可以如下所示(这些是草稿查询,可能需要稍作修改才能运行)

  1. 要在 5 分钟内检测传感器 1,然后检测传感器 2(假设sensorStram 具有 ID、值),您只需使用如下模式并使用“within”关键字即可:

from e1=sensorStream[sensorId == '1'] -> e2=sensorStream[sensorId == '2']

select 'composite activity detected' as description, e1.value as sensor1Value, e2.value as sensor2Value

within 5 minutes

insert into compositeActivityStream;

  • 要检测未发生的情况(id=1 到达,但 5 分钟内没有 id=2),我们可以进行以下两个查询:
  • 
    from sensorStream[sensorId == '1']#window.time(5 minutes)
    
    select *
    
    insert into delayedSensor1Stream for expired-events;
    
    
    from e1=sensorStream[sensorId == '1'] -> nonOccurringEvent = sensorStream[sensorId == '2'] or delayedEvent=delayedSensor1Stream
    
    select 'id=2 not found' as description, e1.value as id1Value, nonOccurringEvent.sensorId as nonOccurringId
    
    having (not(nonOccurringId instanceof string))
    
    insert into nonOccurrenceStream;
    
    
    

    这将在 id=1 事件到达后 5 分钟结束时立即检测未发生的情况。 有关上述逻辑的解释,请查看 non occurrence sample of cep 4.0.0 (语法有点不同,但思路是一样的)

  • 现在,由于您需要定期生成报告,因此我们需要另一个查询。为了方便起见,我假设您每 6 小时(360 分钟)需要一份报告,并在此处使用时间批处理窗口。或者,使用新的 CEP 4.0.0,您可以使用“Cron 窗口”在特定时间生成此窗口,这更适合您的用例。
  • 
    from nonOccurrenceStream#window.timeBatch(360 minutes)
    
    select count(id1Value) as nonOccurrenceCount
    
    insert into nonOccurrenceReportsStream for expired-events;
    
    
    

    对于此用例,您可以使用 http 输入/输出适配器并使用 json 构建器和格式化程序进行 json 映射。

    关于wso2 - WS02 CEP Siddhi 查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31729990/

    相关文章:

    wso2 - 用 Siddhi 编写序列查询

    wso2 - 如何从 BPS 上的 BPEL 流程调用在 ESB 上运行的 REST 代理服务

    apache - 已启用 Commons Fileupload 安全管理器

    fiware - 【使用FiWARE CEP】Kafka broker可以代替Orion Context Broker吗?

    java - 超能力规则语言: fire rule only after condition is fulfilled for a certain time period

    wso2 - 在wso2流处理器中如何动态更改查询

    csv - 在 WSO2 Integration Studio 中读取 .txt 文件

    wso2 - 从迭代器调解器 WSO2 ESB 获取计数器

    java - 监视一系列事件一段时间,然后做出决定