python-3.x - 容器化后,Python Websocket Server不起作用( docker )

标签 python-3.x docker websocket

我有以下代码用于全双工python websocket服务器。

import asyncio
import websockets
import os
import json
import time
import socket
import threading

def get_config_key(key):
        value = os.environ.get(key)
        print("[Agent] get_config_key. Key: '{0}'. Value: '{1}'".format(key, value))
        if value:
            return value
        else:
            return ""

    class Agent_Server_Test:

        def __init__(self):
            print("[Agent_Server_Test] Starting Agent '{}'".format(self))

            self.producer_message = ""
            self.producer_flag = threading.Event()

            agent_worker_thread = threading.Thread(target=self.run_worker_websocket)
            agent_worker_thread.start()
            test_loop = threading.Thread(target=self.test_periodic)
            test_loop.start()

        def run_worker_websocket(self):
            agent_url = self.get_agent_url()
            print("[Agent_Server_Test] run_worker_websocket started for agent_url '{}'".format(agent_url))
            agent_worker_loop = asyncio.new_event_loop()
            asyncio.set_event_loop(agent_worker_loop)
            self.agent_websocket = websockets.serve(self.worker_connection_handler, host = agent_url["host"], port = agent_url["port"])
            agent_worker_loop.run_until_complete(self.agent_websocket)
            agent_worker_loop.run_forever()

        def get_agent_url(self):
            agent_url = { "is_agent_url": True, "host" : "0.0.0.0", "port" : 8889}
            print("[Agent_Server_Test] Returning agent url '{}'.".format(agent_url))
            return agent_url

        async def worker_connection_handler(self, websocket, path):
            print("[Agent_Server_Test] worker_connection_handler activated.")
            worker_recv_task = asyncio.create_task(self.worker_recv_handler(websocket, path))
            worker_send_task = asyncio.create_task(self.worker_send_handler(websocket, path))
            done, pending = await asyncio.wait([worker_recv_task, worker_send_task], return_when=asyncio.FIRST_COMPLETED,)
            for task in pending:
                task.cancel()

        async def worker_recv_handler(self, websocket, path):
            print("[Agent_Server_Test] worker_recv_handler start.")
            async for message in websocket:
                print("[Agent_Server_Test] worker_recv_handler. Received message from Worker: '{}'".format(message))
                self.producer_message = message
                self.producer_flag = True

        async def worker_send_handler(self, websocket, path):
            print("[Agent_Server_Test] worker_send_handler start.")
            while True:
                message = await self.producer()
                print("[Agent_Server_Test] worker_send_handler. Sending message '{}'".format(message))
                await websocket.send(json.dumps(message))

        async def producer(self):
            print("[Agent_Server_Test] Waiting for producer_flag.")
            self.producer_flag.wait()
            print("[Agent_Server_Test] producer flag set to true. Activating producer.")
            self.producer_flag.clear()
            return self.producer_message[::-1]

        def test_periodic(self):
            print("[Agent_Server_Test] test_periodic started.")
            test_loop = asyncio.new_event_loop()
            asyncio.set_event_loop(test_loop)
            count = 0
            while True:
                test_loop.run_until_complete(asyncio.sleep(30)) 
                print("[Agent_Server_Test] test_periodic run '{}', every 30sec.".format(count))
                self.producer_message = "Test_Periodic_Message" + str(count)
                self.producer_flag.set()
                count = count+1

    if __name__ == "__main__":
        try:
            #app.run(host="0.0.0.0", port=int("5000"), debug=True)
            agent = Agent_Server_Test()

        except Exception as e:
            print("[Agent_Server_Test] !!!ERROR!!! EXCEPTION OCCURED IN PROXY TOP LEVEL:'{}'. Exit.".format(e))

我有以下代码可连接以测试连接并向其发送消息。当我将服务器作为EC2实例上的docker容器部署时,remote_ip进行了硬编码。
    import asyncio
import websockets
import os
import json
import time
import socket
import threading

class Agent_Server_Client:

        #agent_server_uri = "ws://localhost:8889"    #local
        agent_server_uri = "ws://<REMOTE_IP>.:8889"    #remote

        def __init__(self):
            asyncio.get_event_loop().run_until_complete(self.test())

        async def test(self):
            print("[Test_Agent_Server_Client] Starting test")
            async with websockets.connect(self.agent_server_uri) as websocket:
                print("[Test_Agent_Server_Client] Sending Test Msg")
                await websocket.send("Test Message")
                response = await websocket.recv()
                print("[Test_Agent_Server_Client] Received response: {}".format(response))
                return response

    if __name__ == "__main__":
        try:
            #app.run(host="0.0.0.0", port=int("5000"), debug=True)
            agent = Agent_Server_Client()

        except Exception as e:
            print("[Test_Agent_Server_Client] !!!ERROR!!! EXCEPTION OCCURED IN PROXY TOP LEVEL:'{}'. Exit.".format(e))

When testing the local version, I get the expected response and print outs from the agent.

    [Test_Agent_Server_Client] Starting test
    [Test_Agent_Server_Client] Sending Test Msg
    [Test_Agent_Server_Client] Received response: "65egasseM_cidoireP_tseT"

    [debug] # [Agent] worker_connection_handler activated.
    [debug] # [Agent] worker_connection_handler tasks created.
    [debug] # [Agent] worker_recv_handler start.
    [debug] # [Agent] worker_send_handler start.
    [debug] # [Agent] Waiting for producer_flag.
    [debug] # [Agent] producer flag set to true. Activating producer.
    [debug] # [Agent] worker_send_handler. Sending message: '46egasseM_cidoireP_tseT'
    [debug] # [Agent] Waiting for producer_flag.

当测试远程版本时,我得到以下响应并且只有一张打印纸:
[Test_Agent_Server_Client] Starting test
[Test_Agent_Server_Client] Sending Test Msg
[Agent_Server_Client] !!!ERROR!!! EXCEPTION OCCURED IN PROXY TOP LEVEL:'code = 1011 (unexpected error), no reason'. Exit.

[[[21-08-19 09:55:49] Agent [debug] # [Agent] worker_connection_handler activated.   │

我认为Docker容器与此有关。
Dockerfile是
FROM python:3.6-alpine3.8
RUN pip3 install websockets
COPY . /agent
WORKDIR /agent
EXPOSE 8889
CMD ["python", "agent.py"]

然后使用Ansible将其部署到EC2实例,如下所示:
---
- hosts: swarm_manager_main
  become: true
  gather_facts: no
  vars_files:
    - "vars/{{ env }}.yml"
  tasks:
    - name: get public_ip4 output
      shell: curl http://EC2_IP/latest/meta-data/public-ipv4 
      register: public_ipv4
    - debug: 
        var: public_ipv4.stdout
    - name: Create docker_pull
      template:
        src: templates/docker_pull_agent.j2
        dest: /root/pull_agent.sh
        owner: root
        group: root
        mode: 0644
    - name: Pull containers
      command: "sh /root/pull_agent.sh"
    - name: (re)-create the agent
      docker_container:
        name: agent
        image: registry.gitlab.com/core/v2/agent
        state: started
        exposed_ports: 8889
        published_ports: 8889:8889
        recreate: yes
        env:
          HOST_MACHINE: "{{ public_ipv4.stdout }}"

我不完全确定为什么它不起作用。我认为由于docker容器,连接已启动但失败了。我的错误在哪里?

最佳答案

我发现了问题:
docker容器使用Python 3.6。我的本地环境Python 3.7。
Asyncio在3.7中仅支持asyncio.create_task。
独立的解决方案将代码更改为:

loop = asyncio.get_event_loop()
worker_recv_task = loop.create_task(self.worker_recv_handler(websocket, path))
worker_send_task = loop.create_task(self.worker_send_handler(websocket, path))

关于python-3.x - 容器化后,Python Websocket Server不起作用( docker ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57589201/

相关文章:

php - 我正在尝试将脉冲实现到 PHP Ratchet Websocket 应用程序中

c++ - Websocketpp 设置消息处理程序

python - 我在 python 中提取 rarfile 时遇到错误(找不到工作工具)

docker - 如何从我的 Dockerfile 运行 Mysql 和 Apache?

docker - 调用 puppeteer.connect() 时出现协议(protocol)错误

docker - 如何连接通过 LAN 连接的两个不同主机上的容器

angular - 在 Angular 中调用 API 后初始化套接字时 Websocket 不工作

顶级模块代码中的 Python @ (at) 前缀 - 它代表什么?

python - Pyserial - RS232 9600,8,N,1 发送和接收数据

c++ - 在 cython 中为 cpp 类使用访问器语法