node.js - Rest API Kafka 流

标签 node.js rest api apache-kafka

我正在构建一个简单的 Node.js Rest API,它将从 Apache Kafka 流生成数据。 API 应该生成 JSON,但我在实现时遇到问题。 如何获取每条消息的 JSON 数组(如 [{},{},{}])有帮助吗? (或者任何更好的方法如何公开 Kafka 流并能够使用 Kafka 流,例如通过 Power BI?)

我当前的输出如下:

{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS  #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}

预期输出如下:

[
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS  #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
,
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
]

我的代码:

const express = require('express');
const router = express.Router();
    var output1 = '';
var http = require('http');  
var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client(),
    consumer = new Consumer(
        client,
        [
        { topic: 'twitterFeeds', partition: 0 }
        ],
        {
            autoCommit: false
        }
    );
    var output2 = consumer.on('message', function (message) {
     obj = JSON.stringify(message);
     output1 = output1 + obj;
    });

router.get('/',(req,res)=>{
        res.send(output1);
    });
module.exports = router;

最佳答案

我目前正在开发一个类似的项目(如果您有兴趣,我们将不胜感激)- scramjet-kafka

上面的问题我已经在scramjet DataStream.toJSONArray解决了它本身 - 它只是将您的数据作为 JSON 数组进行流式传输。在你的情况下,这将非常简单:

new ConsumerStream(
    client,
    [ { topic: 'twitterFeeds', partition: 0 } ],
    { autoCommit: false }
)
    .pipe(new scramjet.DataStream)
    // you may want to do some transforms here
    .toJSONArray()
    .pipe(process.stdout)
    // or pipe it to any other stream, accumulate and so on.

如果您喜欢这个项目并且可以提供帮助 - 请通过 Github 联系。

关于node.js - Rest API Kafka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49385617/

相关文章:

Node/Express 中的 JSON 模板

javascript - 将 jquery 变量发送到express.js 路由

c# - Xamarin Android 应用和 Azure SQL 数据库

reactjs - 类型错误 : "NetworkError when attempting to fetch resource."

javascript - Electron 架构 API 调用

php - 无法通过 zoho api 上传文档

node.js - 为什么 css-nano(几乎)不起作用?

javascript - Ben Awad 视频教程问题与 npx mikro-orm 迁移 :create

java - 如何为 ResponseEntity<> 返回 'Integer' 类型并在 api 页面上获取结果?

android - 移动服务器应用聊天 REST API