node.js - 如何在 NodeJS 中使用 Apache Kafka 将记录添加到 MongoDb 中?

标签 node.js mongodb apache-kafka

我正在尝试使用 Apache Kafka、NodeJS 和 MongoDB。下面我有一个简单的程序,只需将我的名字添加到 mongodb 中即可进行练习。我正在尝试将它与卡夫卡融合,但它也没有按照我想要的方式工作。请参阅下面的代码。

index.html

<!DOCTYPE html>
<html>
<head>
</head>

 <body>
 <form method="post" action="/addname">
 <label>Enter Your Name</label><br>
 <input type="text" name="firstName" placeholder="Enter first name..." 
required>
 <input type="text" name="lastName" placeholder="Enter last name..." 
   required>
 <input type="submit" value="Add Name">
 </form>
 </body>
</html>

index.js

const express = require("express");
const mongoose = require("mongoose");
const bodyParser = require('body-parser');
const kafkaSend = require('./kafka/kafkaScripts/producer.js');
const kafkaRead = require('./kafka/kafkaScripts/consumer.js');
const app = express();
const port = 3000;

app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

mongoose.Promise = 
global.Promise;mongoose.connect("mongodb://localhost:27017/q-research");

var nameSchema = new mongoose.Schema({
   firstName: String,
   lastName: String
});

var User = mongoose.model("User", nameSchema);



app.post("/addname", (req, res) => {
   var myData = new User(req.body);
   myData.save()
   .then(item => {
       let testObj ={
           type: item,
           userId:'userID',
           sessionId: 'whateverSessionId',
           data: item

       };

       kafkaSend.sendRecord(testObj,function(err,data){
          if(err){
             console.log('error: ', err);
          }
          else{
             res.send("item saved to database");
          }

       });

   })
   .catch(err => {
      console.log('err: ', err);
      res.status(400).send("unable to save to database");
   });

});

/* app start info */
app.use("/", (req, res) => {
   res.sendFile(__dirname + "/index.html");
});

app.get("/", (req, res) => {
   res.send("Hello World");
});

app.listen(port, () => {
   console.log("Server listening on port " + port);
});

生产者.js

const kafka = require('kafka-node'),
uuid = require('uuid');

const client = new kafka.Client("localhost:2181", 8, {
   sessionTimeout: 300,
   spinDelay: 100,
   retries: 2
});

const producer = new kafka.HighLevelProducer(client);
producer.on("ready", function() {
  console.log("Kafka Producer is connected and ready.");
});

// For this demo we just log producer errors to the console.
producer.on("error", function(error) {
   console.error(error);
});

const KafkaService = {
   sendRecord: ({ type, userId, sessionId, data }, callback = () => {}) => {
      if (!userId) {
          return callback(new Error(`A userId must be provided.`));
      }

      const event = {
          id: uuid.v4(),
          timestamp: Date.now(),
          userId: userId,
          sessionId: sessionId,
          type: type,
          data: data
      };

       const buffer = new Buffer.from(JSON.stringify(event));

       // Create a new payload
       const record = [
           {
               topic: "webevents.dev",
               messages: buffer,
               attributes: 1 /* Use GZip compression for the payload */
           }
       ];

       //Send record to Kafka and log result/error
       producer.send(record, callback);
   }
};
module.exports = KafkaService;

我收到以下错误。所以这就是我的问题所在。我相信我需要更改测试对象中的类型然后它可能会起作用。我不确定。它也可以有更多的东西。 error

最佳答案

我所要做的就是指向我的虚拟盒子而不是本地主机,因为我的 kafka 位于 Ubuntu 盒子中。

const client = new kafka.Client("10.10.205.17:2181, 8, {
   sessionTimeout: 300,
   spinDelay: 100,
   retries: 2
});

关于node.js - 如何在 NodeJS 中使用 Apache Kafka 将记录添加到 MongoDb 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52610342/

相关文章:

java - 批量发布Kafka对象

javascript - 如何修复 Node js 中的设置 header ?

node.js - 事后如何编写 HTTP 500 错误 header

node.js - 密码仅保护 Node 应用程序的一部分

node.js - Azure AD 和 Node.js SQL 授权

MongoDB find() 输出到文件

java - 从源代码构建 Kafka 时出错

mongodb - Node.js - 在 Heroku 上使用 MongoHQ 连接到 MongoDB

javascript - 如何在 Meteor 中进行简单的数据库查询

java - 如何使用 Spring Cloud Stream 将 Spring Boot 应用程序集成到 Bluemix Cloud 上的 IBM 事件流