java - 如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列

标签 java mysql activemq message-queue

我在 MySql 表和 Java 程序的帮助下实现了一个队列。我想用 Apache ActiveMQ 实现以下程序,非常感谢任何建议。

表名:XXXXX

Col's : id | Msg |  key_id  |Status
----------------------------|------
      |  1 | Msg1|   1      |  1 
      |  2 | Msg2|   2      |  1
      |  3 | Msg3|   1      |  0
      |  4 | Msg4|   1      |  0
      |  5 | Msg5|   4      |  0
      |  6 | Msg6|   3      |  0


while (true) {

        try {
            Thread.sleep(5000);
            Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
            add key_id in list.
            update status of fetched record to 1
            process the fetched record ... some business logic
            set status to 2 if business logic works as expected,else status as 3 if exception occured


            } catch (Exception ex) {

}

如何使用 Apache ActiveMQ 实现此功能?
我是如何尝试的

        package com.jms.activemq.examples;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;

public Sender() {

}

public void sendMessage() {

    try {
        factory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        destination = session.createQueue("SAMPLEQUEUE");
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();

        while (true) {

            try {
                Thread.sleep(5000);
            Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
            add key_id in list.
            update status of fetched record to 1
                message.setText("some data from Database");
                producer.send(message);

                System.out.println("Sent: " + message.getText());
            } catch (Exception ex) {

            }
        }


        }catch (JMSException e) {
        e.printStackTrace();
    }
    }



    public static void main(String[] args) {
        Sender sender = new Sender();
        sender.sendMessage();

    }

    }

和接收者

        package com.jms.activemq.examples;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Receiver {

        private ConnectionFactory factory = null;
        private Connection connection = null;
        private Session session = null;
        private Destination destination = null;
        private MessageConsumer consumer = null;

        public Receiver() {

        }

        public void receiveMessage() {
            try {
                factory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_BROKER_URL);
                connection = factory.createConnection();
                connection = factory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("SAMPLEQUEUE");
                consumer = session.createConsumer(destination);
                Message message = consumer.receive();

                if (message instanceof TextMessage) {
          process the fetched record ... some business logic
          set status to 2 if business logic works as expected,else status as 3 if exception occured

                    TextMessage text = (TextMessage) message;
                    System.out.println("Message is : " + text.getText());

                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) {
            Receiver receiver = new Receiver();
            receiver.receiveMessage();
        }
    }

非常感谢任何建议。

最佳答案

如果我正确理解了您的需求,您可以使用简单的 JDBC 连接器,例如:

public class SimpleJDBCConnector {
    static final String JDBC_DRIVER = "org.postgresql.Driver";
    static final String DB_URL = "jdbc:postgresql://localhost:5432/postgres";
    static final String USER = "postgres";
    static final String PASS = "admin";

    Connection conn = null;
    Statement stmt = null;
    ResultSet rs = null;

    public ResultSet executeQuery(String sql) throws SQLException {
        stmt = conn.createStatement();
        return stmt.executeQuery(sql);
    }

    public void closeConnection() throws SQLException {
        conn.close();
    }

    public boolean connect() throws ClassNotFoundException, SQLException {
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL, USER, PASS);
        return true;
    }
}

以及以下 SQL 查询:

Set<Integer> set = new HashSet<Integer>();
int status=0;
int id=0;
int key_id=0;

String sql1 = "SELECT id,msg,key_id,status FROM xxxx WHERE key_id NOT IN ("
        +set.toString().replaceAll("[\\]\\[\\ ]","")+") AND status = "+status+ + " LIMIT 1;";

String sql2 = "UPDATE xxxx SET status="+status+" WHERE id="+id;

要从响应中提取数据,您可以执行以下操作:

ResultSet rs = db.executeQuery(sql1);
        while(rs.next()){
            id  = rs.getInt("id");
            String msg = rs.getString("msg");
            key_id = rs.getInt("key_id");
            status = rs.getInt("status");
        }

关于java - 如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28826412/

相关文章:

java - Android HttpURLConnection写入post数据

php - 需要帮助理解 mysql LEFT JOINS 和 DataTables

spring - 在 Spring-MVC Tomcat webapp 中嵌入 ActiveMQ 代理

java - JMS 主题生存时间

JMS主题订阅Tomee 1.7.1

java - Java中将Map数据列表分组到嵌套HashMap中

java - 如何在java中快速有效地从列表中创建子集或powerSet?

mysql - 为两个表创建了 TRIGGER 但收到错误消息

mysql - 如何在另一个数据库中创建多个数据库中多个表的单一 View ?

Java 单例设计模式实现