python - 如何在 Tornado 上使用卡夫卡?

标签 python tornado apache-kafka kafka-python

我正在尝试使用基于 this 的 Tornado 制作一个简单的聊天应用程序

但是我也想用kafka来存储消息。我怎样才能做到这一点?

现在,我使用了 this成为一个消费者,它以某种方式工作,但它只在控制台上打印,我需要在网页上显示消息,就像 Tornade 应用程序一样,只是它保存在 kafka 中。

这是我目前的 app.py 代码

#!/usr/bin/env python
#
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import tornado.escape
import tornado.ioloop
import tornado.web
import os.path
import uuid

from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line
from pykafka import KafkaClient


define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")


class MessageBuffer(object):
    def __init__(self):
        self.waiters = set()
        self.cache = []
        self.cache_size = 200

    def wait_for_messages(self, cursor=None):
        # Construct a Future to return to our caller.  This allows
        # wait_for_messages to be yielded from a coroutine even though
        # it is not a coroutine itself.  We will set the result of the
        # Future when results are available.
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future

    def cancel_wait(self, future):
        self.waiters.remove(future)
        # Set an empty result to unblock any coroutines waiting.
        future.set_result([])

    def new_messages(self, messages):
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for future in self.waiters:
            future.set_result(messages)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
consumer = topic.get_simple_consumer()
for message in consumer:
    if message is not None:
        print message.value
# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", messages=global_message_buffer.cache)


class MessageNewHandler(tornado.web.RequestHandler):
    def post(self):
        message = {
            "id": str(uuid.uuid4()),
            "body": self.get_argument("body"),
        }
        # to_basestring is necessary for Python 3's json encoder,
        # which doesn't accept byte strings.
        message["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=message))
        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            self.write(message)
        global_message_buffer.new_messages([message])


class MessageUpdatesHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        cursor = self.get_argument("cursor", None)
        # Save the future returned by wait_for_messages so we can cancel
        # it in wait_for_messages
        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
        messages = yield self.future
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))

    def on_connection_close(self):
        global_message_buffer.cancel_wait(self.future)


def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
            ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
        )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

最佳答案

我知道这是一个老问题,但如果它对其他人有用,可以一起使用 tornado 和 python-kafka 模块(尽管@sixstone 关于使用 kiel 的建议是也不错)。

由于 python-kafaka 是阻塞的,我们还需要运行 tornado 主循环,所以我们需要分离线程。在下面的(较长的)示例中,我为 python-kafka 调用创建了一个线程,并将 Tornado IOLoop 保留在主线程中。

该示例相当冗长,因为它还利用 websockets 在收到消息后立即发布消息。希望对于那些想要通过 websockets 将实时通知与 tornado 和 kafka 结合起来的人来说,增加的复杂性是值得的。

from __future__ import absolute_import, print_function

import collections
import threading

import jinja2
from kafka import KafkaConsumer
import tornado.web
import tornado.websocket


# A global to store some history...
message_history = collections.deque([], maxlen=100)


class KafkaWebSocket(tornado.websocket.WebSocketHandler):
    # Keep track of open sockets globally, so that we can 
    # communicate with them conveniently.
    open_sockets = set()

    @classmethod
    def write_to_all(cls, message):
        removable = set()
        for ws in cls.open_sockets:
            if not ws.ws_connection or not ws.ws_connection.stream.socket:
                removable.add(ws)
            else:
                ws.write_message(message)
        for ws in removable:
            cls.open_sockets.remove(ws)

    def open(self):
        # We don't want these sockets to be buffered.
        self.set_nodelay(True)
        type(self).open_sockets.add(self)


class MainHandler(tornado.web.RequestHandler):
    template = """
<html>
<head>
<link href="//netdna.bootstrapcdn.com/twitter-bootstrap/2.3.1/css/bootstrap-combined.no-icons.min.css" rel="stylesheet">
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js"></script>

</head>
<body>
<div class="container">
{{ messages|length }} messages in cache:
<br><br>
<div id="messages">
{% for message in messages %}
 <div>{{ message }}</div>
{% endfor %}
</div>
</div>
    <footer class="footer">
      <div class="container">
    Web socket status: <div id="message">Not connected</div>
      </div>
    </footer>

<script>

var loc = window.location, new_uri;
if (loc.protocol === "https:") {
    new_uri = "wss:";
} else {
    new_uri = "ws:";
}
new_uri += "//" + loc.host;
new_uri += loc.pathname + "ws";

var ws = new WebSocket(new_uri);
var $message = $('#message');
ws.onopen = function(){
  $message.attr("class", 'label label-success');
  $message.text('open');
};
ws.onmessage = function(ev){
  $message.attr("class", 'label label-info');
  $message.hide();
  $message.fadeIn("slow");
  $message.text('recieved message ' + new Date().toLocaleString());
  $('#messages').append("<div>" + ev.data + "</div>")
};
ws.onclose = function(ev){
  $message.attr("class", 'label label-important');
  $message.text('closed');
};
ws.onerror = function(ev){
  $message.attr("class", 'label label-warning');
  $message.text('error occurred');
};
</script>

</body>
</html>
"""
    def get(self):
        env = jinja2.Environment()
        template = env.from_string(self.template)
        self.write(template.render(messages=message_history))


class Consumer(threading.Thread):
    daemon = True

    def __init__(self, kafka_consumer):
        self._consumer = kafka_consumer
        super(Consumer, self).__init__()

    def run(self):
        for message in self._consumer:
            message = str(message)
            message_history.append(message)
            KafkaWebSocket.write_to_all(message)


def make_app(*args, **kwargs):
    return tornado.web.Application([
        (r"/?", MainHandler),
        (r"/ws/?", KafkaWebSocket),
    ], *args, **kwargs)


if __name__ == "__main__":
    kafka_consumer = Consumer(KafkaConsumer('mytopic'))
    # Start the kafka consumer thread.
    kafka_consumer.start()

    app = make_app()
    app.listen(8889)

    io_loop = tornado.ioloop.IOLoop.current()
    io_loop.start()

关于python - 如何在 Tornado 上使用卡夫卡?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35357354/

相关文章:

python - 编程式 Bokeh 服务器

python - 在 Tornado 中保留 websocket 连接列表

java - kafka启用默认的循环分区器

python - 使用 NumPy 查找数组中最大值的索引

python - 美丽汤网页抓取 : How do i scrape this particular html structure

python - 广播 numpy 点积

java - Kafka消费者输出的数据不一致

python - 将二维数组合并到现有的三维数组

cors - Tornado 服务器: enable CORS requests

scala - Spark Streaming Kafka CreateDirectStream 无法解析