我正在构建一个简单的 Node.js Rest API,它将从 Apache Kafka 流生成数据。
API 应该生成 JSON,但我在实现时遇到问题。
如何获取每条消息的 JSON 数组(如 [{},{},{}]
)有帮助吗? (或者任何更好的方法如何公开 Kafka 流并能够使用 Kafka 流,例如通过 Power BI?)
我当前的输出如下:
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
预期输出如下:
[
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
,
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
]
我的代码:
const express = require('express');
const router = express.Router();
var output1 = '';
var http = require('http');
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(
client,
[
{ topic: 'twitterFeeds', partition: 0 }
],
{
autoCommit: false
}
);
var output2 = consumer.on('message', function (message) {
obj = JSON.stringify(message);
output1 = output1 + obj;
});
router.get('/',(req,res)=>{
res.send(output1);
});
module.exports = router;
最佳答案
我目前正在开发一个类似的项目(如果您有兴趣,我们将不胜感激)- scramjet-kafka
上面的问题我已经在scramjet DataStream.toJSONArray解决了它本身 - 它只是将您的数据作为 JSON 数组进行流式传输。在你的情况下,这将非常简单:
new ConsumerStream(
client,
[ { topic: 'twitterFeeds', partition: 0 } ],
{ autoCommit: false }
)
.pipe(new scramjet.DataStream)
// you may want to do some transforms here
.toJSONArray()
.pipe(process.stdout)
// or pipe it to any other stream, accumulate and so on.
如果您喜欢这个项目并且可以提供帮助 - 请通过 Github 联系。
关于node.js - Rest API Kafka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49385617/