mysql - 带 mysql 和 redis 的 Tornado websockets

标签 mysql asynchronous redis tornado

我有一个 Tornado 服务器,它将与客户端建立网络套接字连接。每当客户端请求某些数据时,我需要从 Redis 或 MySQL-DB 获取它。除此之外,我需要收听来自 Tornado 服务器的广播,接收网络数据包并将它们发送给订阅了数据的客户端。向客户端发送广播数据包取决于数据包中的 token 。如果客户订阅了 token ,我们应该将数据包发送给他。

请求率:

  1. 5000 个事件的网络套接字连接(可以增加)
  2. 每秒每个套接字连接 1-DB 请求,因此总共 5000 个 DB-请求/秒
  3. 每秒每个套接字连接有 1 个 Redis 请求,因此总共有 5000 个 Redis 请求/秒。
  4. 在广播中我应该监听 1000 个数据包/秒并检查是否有任何用户订阅了 token 。

我知道 Redis 是单线程的并且异步工作(如果我错了请纠正我)。对于我正在使用的 MySQL-DB 异步驱动程序是 `tormysql`(我对 DB 的大部分调用都是选择查询,没有复杂的 DB 操作。)。

我的问题:

  1. 对 MySQL-DB 的调用会阻塞吗?如果他们阻止调用,我可以只为数据库查询运行不同的线程/进程吗?
  2. tornado 是否有可能在广播中丢包?
  3. 我可以在我的资源前面有一个负载平衡器并为它们提供服务器,但我是否可以使用具有 2 核 8 GB RAM 的单个 CPU?

更新 1:
我已经为 MySQL 编写了代码:
Server.py

import logging
import tornado.escape
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
import os.path
import uuid
import sys
from tornado import gen

from tornado.options import define, options

import tormysql ## MySQL async driver
pool = tormysql.ConnectionPool(
    max_connections = 20, #max open connections
    idle_seconds = 7200, #conntion idle timeout time, 0 is not timeout
    wait_connection_timeout = 3, #wait connection timeout
    host = "127.0.0.1",
    user = "test",
    passwd = "",
    db = "testdb",
    charset = "utf8"
)


define("port", default=8000, help="run on the given port", type=int)

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", MainHandler),
            (r"/dataSock", ChatSocketHandler),
        ]
        settings = dict(
            cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
            template_path=os.path.join(os.path.dirname(__file__), "templates"),
            static_path=os.path.join(os.path.dirname(__file__), "static"),
            xsrf_cookies=True,
        )
        super(Application, self).__init__(handlers, **settings)

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("indexWS.html")

class ChatSocketHandler(tornado.websocket.WebSocketHandler):
    openConnCount = 0
    def get_compression_options(self):
        # Non-None enables compression with default options.
        return {}

    def open(self):
        # print("Socket open:%s"%(self))
        ChatSocketHandler.openConnCount += 1
        None
    def on_close(self):
        # print("Socket closed:%s"%(self))
        ChatSocketHandler.openConnCount -= 1
        None

    async def on_message(self, message):
        logging.info("open conn count %r", ChatSocketHandler.openConnCount)
        returnDB = await getDataFromDB()
        self.write_message("server got this from you:%s"%(str(returnDB)))

@gen.coroutine
def getDataFromDB():
    with (yield pool.Connection()) as conn:
        try:
            with conn.cursor() as cursor:
                yield cursor.execute("SHOW TABLES;")
                # print(cursor.fetchall())
        except:
            yield conn.rollback()
        else:
            yield conn.commit()

        with conn.cursor() as cursor:
            yield cursor.execute("SELECT * FROM theme;")
            datas = cursor.fetchall()

    return datas

    # yield pool.close()
def main():
    tornado.options.parse_command_line()
    app = Application()
    # print "options:", options
    # sys.exit()
    app.listen(options.port)
    print("PORT:%s"%(options.port))
    tornado.ioloop.IOLoop.current().start()

if __name__ == "__main__":
    main()

现在当我用这个测试这段代码时:
客户端.py

import asyncio
import websockets

async def hello(i):
    async with websockets.connect('ws://localhost:8000/dataSock') as websocket:
        name = 'A'#input("What's your name? ")
        print("******************************%s******************************"%(i))
        for j in range(100):
            await websocket.send(name)
            # print("> {}".format(name))

            greeting = await websocket.recv()
            print("{}: {}".format(i, len(greeting)))
            asyncio.sleep(10)

async def start():
    for i in range(10):
        await hello(i)
    print("end")
    asyncio.sleep(20)
asyncio.get_event_loop().run_until_complete(start())
# asyncio.get_event_loop().run_forever()

如果我运行代码的单个实例,一切都运行良好。当我将客户端数量增加到 70(客户端的 70 个实例)时,会出现 延迟我得到的响应。

第二题解释:
Tornado 服务器必须监听某个端口,我将在该端口接收网络数据包,如果他们订阅了,我必须将其发送给客户端。那么这些数据包是否有可能被丢弃?

最佳答案

  1. Will the call to MySQL-DB is blocking? If they are blocking call can I run different thread/process just for DB queries?

正如您提到的,您正在使用 tormysql 作为驱动程序,所以不,调用不会阻塞,因为 tormysql 是异步的。

  1. Is there a chance that tornado drop packets on the broadcast?

我不太明白你的意思。但是由于 websocket 协议(protocol)是建立在 TCP 之上的,所以所有数据包的传递都是有保证的。 TCP 负责这一点。

  1. I can have a load-balancer in front of my resources and server them but is it possible that I can use a single CPU with 2-cores 8-GB RAM?

是的。


我认为此时您对您的应用程序想得太多了。过早的优化是邪恶的。您还没有编写任何代码,并且正在考虑性能。这只是在浪费你的时间。

先编写代码,然后对其进行压力测试,看看您的应用程序可以处理多少负载。然后进行分析以查看速度变慢的地方。 然后您优化代码,或更改您的设置并可能升级您的硬件。

但是只考虑性能而不编写和压力测试代码只是浪费时间。

关于mysql - 带 mysql 和 redis 的 Tornado websockets,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50038530/

相关文章:

mysql - sql查询返回不正确的结果

multithreading - 如果没有数据等待,从 mpsc::channel 获取数据而不锁定

javascript - 异步执行作为参数传递的函数

redis - 我们什么时候可以期待 Redis Cluster 的稳定版本

redis - 如何在毫秒级查询HBase表中的数据?

redis 集群 - 与集群交互是否需要代理或集群支持库?

php - Mysql删除查询只删除一行中的两列

mysql - 将整个表连接为单行

MySql - 子表中列的 SUM() (1 :n) in a Sub-query/Outer Join using LIMIT

ios - 如何不卡住 UI 并等待响应?