Python。如何从队列/主题 ActiveMQ 中删除任何消息

标签 python python-2.7 queue activemq message-queue

ActiveMQ 我用来在主题 (topic) 和队列 (queue) 中发送消息。

我有两个问题:

  1. 如何删除(取消操作)发送到队列或队列的消息 话题。
  2. 如何完全删除清除所有队列/主题。

使用通过 STOMP 协议(protocol)组织的 AMQ stompy库,但是没有合适的functions

告诉我应该使用哪些库或解决方案本身。

非常感谢。

最佳答案

如何删除我知道的消息,但只是理论上的(通过 WireShark 分析包流量的结果,AMQ 管理页面 ActiveMQ 中的 localhost:8161\admin 页面中的工作浏览器),并且我无法以编程方式删除消息(Python)。

理论上我可以在AMQ中使用参数调用(我在删除时发送到\admin AMQ的packege中看到这个)deleteMessage()[id,secret], 哪里

  • id - 队列\主题中消息的唯一名称
  • secret - 唯一的数字(可能是一些“ token ”),每次更新(例如 F5)\admin\browse 页面时都会发生变化。我不能说它是什么......

请参阅此处的图片:https://ru.stackoverflow.com/q/618697/228254在我的回答下面的帖子中。

示例测试队列:

  • id: ID:######NAME_SERVER######-44458-1485427798954-6:1:1:1:1
  • secret :1dbd2916-337a-48cc-bce7-63b00d38ba3

此时,我的想法是:获取队列中的所有消息并确认我需要丢弃\从队列中删除的消息。

这是我执行此操作的简单客户端代码:

from stompy import stomp
import json


s = stomp.Stomp(amq_ip, amq_port)

try:
    s.connect(username=amq_user, password=amq_pass)
    s.subscribe({'destination': '%s' % amq_queue, 'ack': 'client'})
except Exception as e:
    print "ActiveMQ error\n %s" % e

while True:
    try:
        frame = s.receive_frame()
        body = json.loads(frame.body)

        # это сообщение для меня?
        if body["interested_atr_in_msg"] == "interested_value_of_attr_in_msg":
            print "Its for me. I receive it"
            # Это сообщение для меня. Я его приму и обработаю
            s.ack(frame)
        else:
            # Это сообщение предназначено для кого-то другого и мне не подходит
            print "Its not for me"
    except Exception as e:
        print e

还添加从队列中删除消息的实验测试代码(不起作用,不删除)

# -*- coding: utf-8 -*-
import activemq_api
import urllib3
import json


# Connection to ActiveMQ
BROKER_NAME = "localhost"
AMQ_API_PORT = 8161
AMQ_API_USER = "admin"
AMQ_API_PASS = "admin"
AMQ_API_POSTFIX = "/api/jolokia"
AMQ_TASK_QUEUE_NAME = "test"
BASIC_AUTH ='%s:%s' % (AMQ_API_USER, AMQ_API_PASS)
AMQ_STATUS_QUEUE = "/queue/test"

LOGIN_EXEMPT_URLS = [
    r'admin/'
]

LOGIN_URL = 'url_login'

LOGOUT_REDIRECT_URL = 'url_login'

if __name__ == '__main__':
    user_agent = "curl/7.49.1"
    headers = urllib3.util.make_headers(basic_auth=BASIC_AUTH, user_agent=user_agent)
    addition = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "*/*"
    }
    try:
        headers.update(addition)
        connect = activemq_api.Connection(AMQ_IP, AMQ_API_PORT, BROKER_NAME, headers, AMQ_API_POSTFIX)
        manager = activemq_api.AMQManager(connect)
    except Exception as e:
        print(u'%s: Превышено число максимальных попыток соединения к ActiveMQ' % e.__class__.__name__)
    else:
        print(u'Соединение успешно установлено')

    try:
        id="ID:№№№№№№№№№№№№№№№№№№-54825-1482598606528-3:586:-1:1:1"
        secret="wertrtd-3fdf-4dfd-gr56-dfghdvhshtdfgdw"
        print(manager.removeMsgQueue("test", id))
    except Exception as inst:
        print inst


#!/usr/bin/python2
# -*- coding: utf-8 -*-
import urllib3
import json

class Connection:
    def __init__(self, amq_ip, amq_port, broker, header, postfix):
        self.BROKER_NAME = broker
        self.AMQ_IP = amq_ip
        self.AMQ_PORT = amq_port
        self.HEADERS = header
        self.POSTFIX = postfix

class AMQManager():
    def __init__(self, conn):
        self.QUEUES = {}
        self.QUEUES_COUNT = None
        self.HEAP_MEMORY_USED = None
        self.MEMORY_PERSENT_USED = None
        self.CONNECTION = conn
        self.update()

    def rmQueue(self, queue_names):
        REUQEST = {
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "operation": "removeQueue(java.lang.String)",
            "arguments": [queue_names]
        }
        return json.dumps(REUQEST)

    def queueList(self):
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "attribute":"Queues"
        }
        return json.dumps(REUQEST)

    def browseQueueSubscribers(self):
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "attribute": "QueueSubscribers"
        }
        return json.dumps(REUQEST)

    def memoryPersentUsed(self):
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s" % self.CONNECTION.BROKER_NAME,
            "attribute": "MemoryPercentUsage"
        }
        return json.dumps(REUQEST)

    def heapMemoryUsed(self):
        REUQEST = {
            "type": "read",
            "mbean": "java.lang:type=Memory",
            "attribute":"HeapMemoryUsage",
            "path":"used"
        }
        return json.dumps(REUQEST)

    def request(self, name, param):
        http = urllib3.PoolManager()
        body = ''
        if name == "removeQueue":
            body = self.rmQueue(param["QUEUE_NAME"])
        elif name == "queueList":
            body = self.queueList()
        elif name == "browseQueueSubscribers":
            body = self.browseQueueSubscribers()
        elif name == "memoryPersentUsed":
            body = self.memoryPersentUsed()
        elif name == "heapMemoryUsed":
            body = self.heapMemoryUsed()

        url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
        r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
        return r.data

    def updateQueues(self):
        res = json.loads(self.request("queueList", {}))
        # print res
        data = []
        for queue in res["value"]:
            object = {}
            queue["objectName"] = queue["objectName"].split(":")[1]
            for key in queue["objectName"].split(","):
                object.update({key.split("=")[0]: key.split("=")[1]}) 
            data.append(object)
        self.QUEUES_COUNT = 0
        self.QUEUES = {}
        # print data
        for queue in data:
            self.QUEUES.update({queue["destinationName"]: Queue(queue["destinationName"], self.CONNECTION)})
            self.QUEUES_COUNT += 1

    def updateHeapMem(self):
        self.HEAP_MEMORY_USED = json.loads(self.request("heapMemoryUsed", {}))["value"]

    def updatePersMem(self):
        self.MEMORY_PERSENT_USED = json.loads(self.request("memoryPersentUsed", {}))["value"]

Ars, [26.01.17 14:06]
## EXPORTABLE
    def update(self):
        self.updateQueues()
        self.updateHeapMem()
        self.updatePersMem()        
    ## EXPORTABLE
    def getQueues(self):
        self.updateQueues()
        data = []
        for queue in self.QUEUES:
            data.append(self.QUEUES[queue].getInfo())
        return {
            "queues_count": self.QUEUES_COUNT,
            "queues": data
        }
    ## EXPORTABLE
    def getQueueInfo(self, name):
        return self.QUEUES[name].getInfo()
    ## EXPORTABLE
    def browseQueue(self, name):
        return self.QUEUES[name].browse()
    ## EXPORTABLE
    def getMessage(self, name, msg_id):
        return self.QUEUES[name].message(msg_id)
    def getAllQueueMessages(self, name):
        return self.QUEUES[name].messages()
    ## EXPORTABLE
    def removeQueue(self, name):
        param = {
            "QUEUE_NAME": name
        }
        return json.loads(self.request("removeQueue", param))
    ## EXPORTABLE
    def clearQueue(self, name):
        return self.QUEUES[name].clear()
    # ARS
    def removeMsgQueue(self,nameQueue, id):
        return self.QUEUES[nameQueue].delete_msg(id)



class Queue():
    def __init__(self, q_name, conn):
        # научите обращаться к атрибутам суперкласса!
        self.MESSAGES = []
        self.QUEUE_NAME = q_name
        self.ENQUEUE_COUNT = None
        self.DEQUEUE_COUNT = None
        self.CONSUMER_COUNT = None
        self.QUEUE_SIZE = None
        self.CONNECTION = conn
        self.updateEnCount()
        self.updateDeCount()
        self.updateCoCount()
        self.updateQuSize()

    def queueEnqueueCount(self):
        # MSG_NAMES = ['JMSMessageID="ID:localhost-39797-1466874134889-3:1:-1:1:1"']
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "EnqueueCount"
        }
        return json.dumps(REUQEST)

    def queueDequeueCount(self):
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "DequeueCount"
        }
        return json.dumps(REUQEST)

    def queueConsumerCount(self):
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "ConsumerCount"
        }
        return json.dumps(REUQEST)

    def queueSize(self):
        REUQEST = {
            "type": "read",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "attribute": "QueueSize"
        }
        return json.dumps(REUQEST)

    def browseMessages(self):
        REUQEST = {
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "operation": "browse()",
            # "arguments": [""]
        }
        return json.dumps(REUQEST)

Ars, [26.01.17 14:06]
def purge(self):
        REUQEST = {
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                        % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "operation": "purge()"
        }
        return json.dumps(REUQEST)
    #ARS
    def deleteMsg(self, ID):
        REUQEST = {
            "type": "exec",
            "mbean": "org.apache.activemq:type=Broker,brokerName=%s,destinationName=%s,destinationType=Queue" \
                     % (self.CONNECTION.BROKER_NAME, self.QUEUE_NAME),
            "operation": "deleteMessage()",
            "arguments": [ID, "11111111-1111-1111-1111-111111111111"]
        }
        return json.dumps(REUQEST)

    def request(self, name, param):
        http = urllib3.PoolManager()

        if name == "queueEnqueueCount":
            body = self.queueEnqueueCount()
        elif name == "queueDequeueCount":
            body = self.queueDequeueCount()
        elif name == "queueConsumerCount":
            body = self.queueConsumerCount()
        elif name == "queueSize":
            body = self.queueSize()
        elif name == "browseMessages":
            body = self.browseMessages()
        elif name == "purge":
            body = self.purge()
        elif name == "delete_msg":
            body = self.deleteMsg(param)

        url = "http://%s:%d%s" % (self.CONNECTION.AMQ_IP, self.CONNECTION.AMQ_PORT, self.CONNECTION.POSTFIX)
        r = http.request('POST', url, headers=self.CONNECTION.HEADERS, body=body)
        return r.data

    def updateEnCount(self):
        try:
            self.ENQUEUE_COUNT = json.loads(self.request("queueEnqueueCount", {}))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateDeCount(self):
        try:
            self.DEQUEUE_COUNT = json.loads(self.request("queueDequeueCount", {}))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateCoCount(self):
        try:
            self.CONSUMER_COUNT = json.loads(self.request("queueConsumerCount", {}))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateQuSize(self):
        try:
            self.QUEUE_SIZE = json.loads(self.request("queueSize", {}))["value"]
        except Exception as inst:
            self.ENQUEUE_COUNT = -1

    def updateMessages(self):
        self.MESSAGES = []
        res = json.loads(self.request("browseMessages", {}))["value"]
        for msg in res:
            data = {
                "id": msg["JMSMessageID"],
                "data": msg["Text"],
                "timestamp": msg["JMSTimestamp"],
                "priority": msg["JMSPriority"]                
            }
            self.MESSAGES.append(data)

    def update(self):
        self.updateEnCount()
        self.updateDeCount()
        self.updateCoCount()
        self.updateQuSize()
        self.updateMessages()

    def getInfo(self):
        self.updateEnCount()
        self.updateDeCount()
        self.updateCoCount()
        self.updateQuSize()
        return {
            "queue_name": self.QUEUE_NAME,
            "enqueue_count": self.ENQUEUE_COUNT,
            "dequeue_count": self.DEQUEUE_COUNT,
            "consumer_count": self.CONSUMER_COUNT,
            "queue_size": self.QUEUE_SIZE
        }

    def browse(self):
        self.updateMessages()
        data = []
        for msg in self.MESSAGES:
            chunk = {
                "id": msg["id"],
                "timestamp": msg["timestamp"],
                "priority": msg["priority"]
            }
            data.append(chunk)
        return data

Ars, [26.01.17 14:06]
def message(self, msg_id):
        self.updateMessages()
        for msg in self.MESSAGES:
            if msg["id"] == msg_id:
                return msg["data"]
    # ARS
    def messages(self):
        self.updateMessages()
        return self.MESSAGES

    # ARS
    def delete_msg(self, id):
        return json.loads(self.request("delete_msg",id))

    def clear(self):
        return json.loads(self.request("purge", {}))

关于Python。如何从队列/主题 ActiveMQ 中删除任何消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41827469/

相关文章:

c++ - 使用 native Windows API 的 win32 线程安全队列实现

python - 在更大的图中嵌入 matplotlib 图

python - 如何优雅地对 Pandas 中的一系列列表进行热编码

Python作业

python - 在 Python 中对混合列表进行排序

python - 基维。文本提供者错误

javascript - 连续运行 jQuery 动画

Java-链​​表存储难点

python - Fabric 密码

python - openCV中的视频捕获使用ffmpeg吗?