python - asyncio 客户端服务器在 docker 中不起作用

标签 python python-3.x docker python-3.5 python-asyncio

我有客户端和服务器(通过 http 发送 json 消息)在我的标准 ubuntu 16.04 上工作正常

但是后来我尝试在 docker 内部运行它的客户端和服务器,或者在 docker 外部的客户端和服务器内部运行它,我遇到了错误。

我的 docker 命令:
须藤 docker 运行 -p 127.0.0.1:8888:8888 -i -t seo_server

这是我的服务器和客户端代码和错误:

服务器

import asyncio
import json
import aiohttp
import re

async def get_cy(domain):           
    return {'result': 'ok','value':10}



async def handle(reader, writer):
    # data = await reader.read(1000)

    data = bytearray()
    while not reader.at_eof():
        chunk = await reader.read(2 ** 12)
        data += chunk

    # print(json.loads(data))
    # https://aiomas.readthedocs.io/en/latest/
    message = data.decode()

    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))
    m = json.loads(message)
    f = m['method']
    del m['method']
    r = await globals()[f](**m)
    r = json.dumps(r)
    # await asyncio.sleep(int(m['time']))
    print("Send: %r" % r)
    writer.write(r.encode())
    await writer.drain()

    print("Close the client socket")
    writer.close()



if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle, '127.0.0.1', 8888, loop=loop)
    server = loop.run_until_complete(coro)
    # Serve requests until Ctrl+C is pressed
    print('Serving cy microservice on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

客户
import asyncio
import json
from itertools import zip_longest
import time

def to_json(func):
    def wrap(**kwargs):
        message = kwargs
        message['method'] = func.__name__
        print(message)
        return asyncio.ensure_future(tcp_send(json.dumps(message)))

    return wrap


@to_json
def get_cy(domain):
    pass


async def tcp_send(message):
    loop = asyncio.get_event_loop()
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
                                                   loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())
    writer.write_eof()

    data = await reader.read()
    data = data.decode()
    print('Received: %r' % data)

    print('Close the socket')
    writer.close()
    return json.loads(data)


def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)


def async_map(loop, f, iterable, chunk_size=2):
    for chunk in grouper(iterable, chunk_size):
        future = asyncio.gather(*(f(param) for param in chunk if param))
        loop.run_until_complete(future)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()


    async def cy(site):
        cy = await get_cy(domain=site)
        print(site + " cy =", cy)
        #update site here

    while True:
        sites = ('site1.ru', 'site2.ru', 'site3.ru', 'site4.ru', 'site5.ru')
        async_map(loop, cy, sites)
        time.sleep(100) #if not sites

错误然后我在 docker 中尝试客户端和服务器:

客户端错误
root@341fdee56d6d:/seo_server# python client.py 
{'domain': 'site1.ru', 'method': 'get_cy'}
{'domain': 'site2.ru', 'method': 'get_cy'}
Send: '{"domain": "site2.ru", "method": "get_cy"}'
Send: '{"domain": "site1.ru", "method": "get_cy"}'
Received: ''
Close the socket
Traceback (most recent call last):
  File "client.py", line 63, in <module>
    async_map(loop, cy, sites)
  File "client.py", line 49, in async_map
    loop.run_until_complete(future)
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 341, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 276, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 236, in _step
    result = coro.throw(exc)
  File "client.py", line 57, in cy
    cy = await get_cy(domain=site)
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 387, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 287, in _wakeup
    value = future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 276, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 238, in _step
    result = coro.send(value)
  File "client.py", line 36, in tcp_send
    return json.loads(data)
  File "/usr/local/lib/python3.5/json/__init__.py", line 319, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.5/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.5/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Task was destroyed but it is pending!
task: <Task pending coro=<tcp_send() running at client.py:30> wait_for=<Future pending cb=[Task._wakeup()]> cb=[Task._wakeup()]>

服务器错误
     sudo docker run -p 127.0.0.1:8888:8888 -i -t seo_server 
Serving cy microservice on ('127.0.0.1', 8888)
Received '{"domain": "site2.ru", "method": "get_cy"}' from ('127.0.0.1', 47768)
http://bar-navig.yandex.ru/u?ver=2&show=31&url=http://site2.ru
Received '{"domain": "site1.ru", "method": "get_cy"}' from ('127.0.0.1', 47770)
http://bar-navig.yandex.ru/u?ver=2&show=31&url=http://site1.ru
Send: '{"result": "ok", "value": "50"}'
Task exception was never retrieved
future: <Task finished coro=<handle() done, defined at seo_server.py:18> exception=ConnectionResetError('Connection lost',)>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 238, in _step
    result = coro.send(value)
  File "seo_server.py", line 40, in handle
    await writer.drain()
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 304, in drain
    yield from self._protocol._drain_helper()
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 195, in _drain_helper
    raise ConnectionResetError('Connection lost')
ConnectionResetError: Connection lost
Send: '{"result": "ok", "value": "50"}'
Task exception was never retrieved
future: <Task finished coro=<handle() done, defined at seo_server.py:18> exception=ConnectionResetError('Connection lost',)>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 238, in _step
    result = coro.send(value)
  File "seo_server.py", line 40, in handle
    await writer.drain()
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 304, in drain
    yield from self._protocol._drain_helper()
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 195, in _drain_helper
    raise ConnectionResetError('Connection lost')
ConnectionResetError: Connection lost

如果我在 docker 之外尝试客户端,我会收到此客户端错误并且在服务器上没有任何操作:

外部客户端错误
/usr/bin/python3.5 /home/se7en/examples/python_3.5/seo_server/client.py
{'method': 'get_cy', 'domain': 'site1.ru'}
{'method': 'get_cy', 'domain': 'site2.ru'}
Send: '{"method": "get_cy", "domain": "site1.ru"}'
Send: '{"method": "get_cy", "domain": "site2.ru"}'
Traceback (most recent call last):
  File "/home/se7en/examples/python_3.5/seo_server/client.py", line 63, in <module>
    async_map(loop, cy, sites)
  File "/home/se7en/examples/python_3.5/seo_server/client.py", line 49, in async_map
    loop.run_until_complete(future)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 373, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 242, in _step
    result = coro.throw(exc)
  File "/home/se7en/examples/python_3.5/seo_server/client.py", line 57, in cy
    cy = await get_cy(domain=site)
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 297, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/se7en/examples/python_3.5/seo_server/client.py", line 28, in tcp_send
    writer.write_eof()
  File "/usr/lib/python3.5/asyncio/streams.py", line 294, in write_eof
    return self._transport.write_eof()
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 745, in write_eof
    self._sock.shutdown(socket.SHUT_WR)
OSError: [Errno 107] Transport endpoint is not connected

我的码头文件:
FROM davidjfelix/python3.5
RUN pip3 install aiohttp
ADD . /seo_server
WORKDIR /seo_server
CMD python seo_server.py

docker 版本:
$ sudo docker version
Client:
 Version:      1.11.2
 API version:  1.23
 Go version:   go1.5.4
 Git commit:   b9f10c9
 Built:        Wed Jun  1 22:00:43 2016
 OS/Arch:      linux/amd64

Server:
 Version:      1.11.2
 API version:  1.23
 Go version:   go1.5.4
 Git commit:   b9f10c9
 Built:        Wed Jun  1 22:00:43 2016
 OS/Arch:      linux/amd64

请帮助查找问题并修复

最佳答案

由于容器隔离,不同容器中的python线程彼此看不到。因此,对于 asyncio 循环,您需要在同一个容器中启动服务器和工作器/客户端。您可以通过启动 .sh 脚本来完成,如此处所述 https://docs.docker.com/config/containers/multi-service_container/
或通过 Supervisord 相同。

关于python - asyncio 客户端服务器在 docker 中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38418543/

相关文章:

ubuntu - 保存当前正在运行的 redmine docker 更改以重用

docker - Traefik的原木在哪里?

python - 在 Python IDE 的嵌入式控制台中访问某个项目的 token

python - 如何从本地系统加载 TF hub 模型

python - 在 Flask 响应头中设置 Unicode 文件名

python - sqlalchemy filter_by() 又多了一个参数

python - 如何在不使用 mpl.rc() 的情况下更改 matplotlib 补丁字体大小

python - 如何在 Python 中将 float 格式化为固定宽度

python - Pandas Dataframe 转置为不同数量的列

python - AWS EC2 无法执行脚本 docker-compose