wso2 - WS02 CEP Siddhi 查询

标签 wso2 complex-event-processing siddhi wso2-cep

Siddhi CEP 新手。除了 WS02 CEP 上的常规文档之外,有人可以指出一个很好的教程。

这是我们的要求。指出一些有关编写此类查询的正确方法的线索。

  • 拥有单个传感器设备通知流(IOT 应用程序)。
  • 流输入通过 REST-JSON 进行,输出也将格式化为 REST-JSON。 (希望这在 WS02 CEP 3.1 上是可能的)

所需的执行计划类型: - 如果设备通知报告传感器 1 的使用情况,则监视以查看 5 分钟内设备通知是否也报告传感器 2 的使用情况。如果找到,则生成输出流,以 REST-JSON 报告复合事件。
- 如果在早上、下午和晚上的某个时间段内未检测到此类复合事件,则在 REST-JSON 上生成警告事件流状态。 (那么如何找到没有及时发生的事件) - 如果在早上、下午和晚上的某些时间段内未找到此类复合事件,则在 REST-JSON 上报告 failure1-event-stream 状态。

这应该日复一日地工作,那么如何在 WSO2 CEP 中删除先前处理的数据。

问候, 阿米特

最佳答案

查询可以如下所示(这些是草稿查询,可能需要稍作修改才能运行)

  1. 要在 5 分钟内检测传感器 1,然后检测传感器 2(假设sensorStram 具有 ID、值),您只需使用如下模式并使用“within”关键字即可:

from e1=sensorStream[sensorId == '1'] -> e2=sensorStream[sensorId == '2']

select 'composite activity detected' as description, e1.value as sensor1Value, e2.value as sensor2Value

within 5 minutes

insert into compositeActivityStream;

  • 要检测未发生的情况(id=1 到达,但 5 分钟内没有 id=2),我们可以进行以下两个查询:
  • 
    from sensorStream[sensorId == '1']#window.time(5 minutes)
    
    select *
    
    insert into delayedSensor1Stream for expired-events;
    
    
    from e1=sensorStream[sensorId == '1'] -> nonOccurringEvent = sensorStream[sensorId == '2'] or delayedEvent=delayedSensor1Stream
    
    select 'id=2 not found' as description, e1.value as id1Value, nonOccurringEvent.sensorId as nonOccurringId
    
    having (not(nonOccurringId instanceof string))
    
    insert into nonOccurrenceStream;
    
    
    

    这将在 id=1 事件到达后 5 分钟结束时立即检测未发生的情况。 有关上述逻辑的解释,请查看 non occurrence sample of cep 4.0.0 (语法有点不同,但思路是一样的)

  • 现在,由于您需要定期生成报告,因此我们需要另一个查询。为了方便起见,我假设您每 6 小时(360 分钟)需要一份报告,并在此处使用时间批处理窗口。或者,使用新的 CEP 4.0.0,您可以使用“Cron 窗口”在特定时间生成此窗口,这更适合您的用例。
  • 
    from nonOccurrenceStream#window.timeBatch(360 minutes)
    
    select count(id1Value) as nonOccurrenceCount
    
    insert into nonOccurrenceReportsStream for expired-events;
    
    
    

    对于此用例,您可以使用 http 输入/输出适配器并使用 json 构建器和格式化程序进行 json 映射。

    关于wso2 - WS02 CEP Siddhi 查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31729990/

    相关文章:

    wso2 - 使用 ESB 和 DSS 集群的动态负载平衡,WSO2

    oauth-2.0 - WSO2 身份服务器 JWT 承载

    java - 对于 "object"属性类型,Siddhi 支持哪种对象?

    java - siddhi - 无法使用 siddhi 从rabbitmq 检索事件消息

    java - 西提 CEP : Aggregate functions with time window don't "remove" values from aggregation

    python - 无法安装 pysiddhi

    java - WSO2 身份服务器的 MEX 端点

    java - WSO2 DSS 对 mongodb 3.x.x 的支持

    Drools 6.1.0.Final CEP 示例 : Unable to create Field Extractor

    oracle - 从哪种 CEP 产品开始?