python-3.x - 从 Kubernetes 事件中调试 NATS 消息

标签 python-3.x kubernetes nats.io

我有一个监视 Kubernetes 事件然后向 NATS 服务器发布消息的简单脚本:

#!/usr/bin/env python
import asyncio
import argparse
import json
import logging
import os

from kubernetes import client, config, watch

from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

# monkey patch
from kube import local_load_oid_token
config.kube_config.KubeConfigLoader._load_oid_token = local_load_oid_token

parser = argparse.ArgumentParser()
parser.add_argument('--in-cluster', help="use in cluster kubernetes config", action="store_true")
parser.add_argument('-a', '--nats-address', help="address of nats cluster", default=os.environ.get('NATS_ADDRESS', None))
parser.add_argument('-d', '--debug', help="enable debug logging", action="store_true")
parser.add_argument('-p', '--publish-events', help="publish events to NATS", action="store_true")
parser.add_argument('--output-events', help="output all events to stdout", action="store_true", dest='enable_output')
parser.add_argument('--connect-timeout', help="NATS connect timeout (s)", type=int, default=10, dest='conn_timeout')
parser.add_argument('--max-reconnect-attempts', help="number of times to attempt reconnect", type=int, default=1, dest='conn_attempts')
parser.add_argument('--reconnect-time-wait', help="how long to wait between reconnect attempts", type=int, default=10, dest='conn_wait')
args = parser.parse_args()

logger = logging.getLogger('script')
ch = logging.StreamHandler()
if args.debug:
    logger.setLevel(logging.DEBUG)
    ch.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)
    ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)

if not args.nats_address:
    logger.critical("No NATS cluster specified")
    exit(parser.print_usage())
else:
    logger.debug("Using nats address: %s", args.nats_address)

if args.in_cluster:
    config.load_incluster_config()
else:
    try:
        config.load_kube_config()
    except Exception as e:
        logger.critical("Error creating Kubernetes configuration: %s", e)
        exit(2)

v1 = client.CoreV1Api()


async def run(loop):
    nc = NATS()
    try:
        await nc.connect(args.nats_address, loop=loop, connect_timeout=args.conn_timeout, max_reconnect_attempts=args.conn_attempts, reconnect_time_wait=args.conn_wait)
        logger.info("Connected to NATS at %s..." % (nc.connected_url.netloc))
    except Exception as e:
        exit(e)

    #print("Connected to NATS at {}...".format(nc.connected_url.netloc))

    async def get_node_events():
        w = watch.Watch()
        for event in w.stream(v1.list_node):
            accepted = ["DELETED"]
            if event['type'] in accepted:
                logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
                msg = {'type':event['type'],'object':event['raw_object']}
                logger.debug("Raw Message: %s"  % msg)
                await nc.publish("k8s_events", json.dumps(msg).encode('utf-8'))
                if args.enable_output:
                    print(json.dumps(msg))

    await get_node_events()
    await nc.flush(timeout=3)
    await nc.close()




if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.create_task(run(loop))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info('keyboard shutdown')
        tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
        tasks.add_done_callback(lambda t: loop.stop())
        tasks.cancel()

        # Keep the event loop running until it is either destroyed or all
        # tasks have really terminated
        while not tasks.done() and not loop.is_closed():
            loop.run_forever()
    finally:
        logger.info('closing event loop')
        loop.close()

在启用事件发布的情况下运行它时,我可以看到正在输出事件 JSON。

但是,由于某些原因,我的接收器实际上并没有收到删除事件的 NATS 消息。

  • 如何调试进入主题的消息?有什么我可以添加的,它可以验证通过代码将消息添加到主题中吗?
  • 我的异步逻辑在这里正确吗?
  • 为什么删除事件不能以这种逻辑进入主题?

最佳答案

您可以通过 NATS 服务器日志验证消息传递。在配置 NATS 服务器时,使用传递给 NATS 服务器的 -DV 标志或在 NATS 配置文件中临时启用调试/跟踪:

debug=true
trace=true 

你应该看到这样的东西:

[31070] 2019/09/10 13:34:40.426198 [DBG] 127.0.0.1:53203 - cid:6 - Client connection created
[31070] 2019/09/10 13:34:40.426582 [TRC] 127.0.0.1:53203 - cid:6 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.7.0","protocol":1,"echo":true}]
[31070] 2019/09/10 13:34:40.426614 [TRC] 127.0.0.1:53203 - cid:6 - <<- [PING]
[31070] 2019/09/10 13:34:40.426625 [TRC] 127.0.0.1:53203 - cid:6 - ->> [PONG]
[31070] 2019/09/10 13:34:40.426804 [TRC] 127.0.0.1:53203 - cid:6 - <<- [SUB k8s_events  1]
[31070] 2019/09/10 13:34:40.426821 [TRC] 127.0.0.1:53203 - cid:6 - <<- [PING]
[31070] 2019/09/10 13:34:40.426827 [TRC] 127.0.0.1:53203 - cid:6 - ->> [PONG]
[31070] 2019/09/10 13:34:44.167844 [DBG] ::1:53206 - cid:7 - Client connection created
[31070] 2019/09/10 13:34:44.168352 [TRC] ::1:53206 - cid:7 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Publisher","lang":"go","version":"1.7.2","protocol":1,"echo":true}]
[31070] 2019/09/10 13:34:44.168383 [TRC] ::1:53206 - cid:7 - <<- [PING]
[31070] 2019/09/10 13:34:44.168390 [TRC] ::1:53206 - cid:7 - ->> [PONG]
[31070] 2019/09/10 13:34:44.168594 [TRC] ::1:53206 - cid:7 - <<- [PUB k8s_events 11]
[31070] 2019/09/10 13:34:44.168607 [TRC] ::1:53206 - cid:7 - <<- MSG_PAYLOAD: ["{json data}"]
[31070] 2019/09/10 13:34:44.168623 [TRC] 127.0.0.1:53203 - cid:6 - ->> [MSG k8s_events 1 11]
[31070] 2019/09/10 13:34:44.168648 [TRC] ::1:53206 - cid:7 - <<- [PING]
[31070] 2019/09/10 13:34:44.168653 [TRC] ::1:53206 - cid:7 - ->> [PONG]

使用连接 ID,您可以看到连接 ID 7 向 k8s_events 发布了 11 个字节(协议(protocol)消息 PUB k8s_events 11,其后是消息负载),并且连接 ID 6(订阅者)收到消息(MSG k8s_events 1 11)。

这是一种检查您的客户是否正在发布消息以及您的订阅者正在收听正确主题的方法。

关于python-3.x - 从 Kubernetes 事件中调试 NATS 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53381950/

相关文章:

go - 如何检查 NATS 请求是否已取消

python - 具有特定间隙的 numpy 排列

python - 欧拉计划 #23 错误答案

json - Elasticsearch(+ Kibana)中的 Fluentd 错误地解析了 Nginx json 日志

docker - Kubernetes 无法识别本地 docker 镜像

python - NATS WEBSOCKET Python WebSocket 连接

python - 如何修复 tensorflow 中的 'ValueError: Empty Training Data'错误

python - 找不到 libNVVM

docker - 如何启动(重启)kubernetes apiservice并添加用户名密码认证