java - siddhi - 无法使用 siddhi 从rabbitmq 检索事件消息

标签 java rabbitmq complex-event-processing siddhi wso2-streaming-integrator

public static void main(String[] args) {
    String siddhiApp = "@App:name('TestExecutionPlan') " 
            + "define stream FooStream (teste string);  "
            + "@info(name = 'query1')  " 
            + "@source(type ='rabbitmq', "
            + "uri = 'amqp://test:test@192.168.99.100:5672', " 
            + "exchange.name = 'amq.topic', "
            + "exchange.type = 'topic', "
            + "routing.key= '#', "
            + "queue.name = 'siddhi-queue', "
            + "@map(type='text')) " 
            + "Define stream BarStream (test string); "
            + "from FooStream select test insert into BarStream; ";

    SiddhiManager siddhiManager = new SiddhiManager();

    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

    siddhiAppRuntime.start();

    siddhiAppRuntime.addCallback("FooStream", new StreamCallback() {
        public void receive(Event[] event) {
            EventPrinter.print(event);
        }
    });

}

此代码无法从rabbitmq检索事件消息。

我可以看到rabbitmq仪表板的连接和 channel ,所有发布到exchange的消息都被传递到其他绑定(bind)队列。

最佳答案

您需要对源“BarStream”进行回调。您可以尝试以下示例。

    @Test
public void rabbitmqSourceTest() throws InterruptedException {
    AtomicInteger eventCount = new AtomicInteger(0);
    String siddhiApp = "@App:name('TestExecutionPlan') "
            + "define stream FooStream (test string); "
            + "@info(name = 'query1')  "
            + "@source(type ='rabbitmq', "
            + "uri = 'amqp://guest:guest@172.17.0.2:5672', "
            + "exchange.name = 'amq.topic', "
            + "exchange.type = 'topic', "
            + "routing.key= '#',"
            + "queue.name = 'siddhi-queue', "
            + "@map(type='text')) "
            + "define stream BarStream (test string); ";
    SiddhiManager siddhiManager = new SiddhiManager();
    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
    siddhiAppRuntime.addCallback("BarStream", new StreamCallback() {
        @Override
        public void receive(Event[] events) {
            for (Event event : events) {
                EventPrinter.print(event);
                eventCount.incrementAndGet();
            }
        }
    });
    siddhiAppRuntime.start();
    SiddhiAppRuntime executionPlanRuntime = siddhiManager.createSiddhiAppRuntime(
            "@App:name('TestExecutionPlan') " +
                    "define stream FooStream (test string); " +
                    "@info(name = 'query1') " +
                    "@sink(type ='rabbitmq', uri = 'amqp://guest:guest@172.17.0.2:5672', " +
                    "exchange.type='topic', " +
                    "exchange.name = 'amq.topic', " +
                    "@map(type='text'))" +
                    "Define stream BarStream (test string);" +
                    "from FooStream select test insert into BarStream;");
    InputHandler fooStream = executionPlanRuntime.getInputHandler("FooStream");
    executionPlanRuntime.start();
    List<Event> arrayList = new ArrayList<Event>();
    arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
    arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"IBM"}));
    arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
    fooStream.send(arrayList.toArray(new Event[3]));
    SiddhiTestHelper.waitForEvents(waitTime, 3, eventCount, timeout);
    executionPlanRuntime.shutdown();
    siddhiAppRuntime.shutdown();
}

请参阅 Siddhi 查询指南:Siddhi-source

关于java - siddhi - 无法使用 siddhi 从rabbitmq 检索事件消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50806867/

相关文章:

rabbitmq - 如何随机化 RabbitMQ 中的消息顺序?

wso2 - WS02 CEP Siddhi 查询

drools - CEP 是我需要的吗(系统状态和事件重播)

java - 禁用用户主文件夹创建

java - Apache 公共(public)守护进程 - procrun。停止服务挂起

c# - RabbitMQ 事务在队列关闭时无一异常(exception)地提交

rest - Pub/Sub RabbitMQ 自动文档/契约(Contract)生成

java - 从 Esper + sockets 开始

java - 如何在 JLabel 中的文本前添加空格?

java - 在 Java 项目中自动增加内部版本号