node.js - 我想在 Node js 中运行 cron 以在 Neo4j DB 中添加 youtube 视频数据,但在 cron 运行时出现错误

标签 node.js neo4j cron

app.js

var express           =     require('express')
  , neo4j_match               =     require('neo4j')
   , session           =     require('express-session')
  , bodyParser        =     require('body-parser')
  , moment            =     require('moment')
  , request           =     require('request')
  , app               =     express();
  var every = require('every-moment');
  var neo4j = require('neo4j-driver').v1;
  app.use(bodyParser.json());
  app.use(bodyParser.urlencoded({extended: true}));
  var db = new neo4j_match.GraphDatabase('http://Novasys:1234567@localhost:7474');
var driver = neo4j.driver("bolt://localhost:7687", neo4j.auth.basic("Novasys", "1234567"));
var session = driver.session();
  every(40, 'seconds', function() {

session
  .run('match (c:channel) where c.from="yt" return c')
  .then(function(result){
    return result.records.map(function(record){
      return {
        title: record._fields[0].properties.channelid
      };
    })
    session.close();
  })
  .then(function(cron_channel){
    for (var k in cron_channel){
       (function(k){
          console.log(cron_channel[k].title);
          request('https://www.googleapis.com/youtube/v3/search?part=snippet&maxResults=50&channelId='+cron_channel[k].title+'&order=date&key=AIzaSyBj7mztzfuZq_LA_5TYHLAw-Qaa2Fof3JU', function (error, response, body) {
  if (!error && response.statusCode == 200) {
  body=JSON.parse(body);

  for(var i in body.items){
    (function(i){
    //selecting video id and getting their meta data
    request('https://www.googleapis.com/youtube/v3/videos?part=statistics,snippet,contentDetails&id='+body.items[i].id.videoId+'&key=AIzaSyBj7mztzfuZq_LA_5TYHLAw-Qaa2Fof3JU',function(err, res, vid){
    if (!error && res.statusCode == 200){
    vid=JSON.parse(vid);
    for(var j in vid.items){
        (function(j){
      //node creation 
      if(vid.items[j].statistics.likeCount == null){
        db.cypher({
    query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, categoryid:{categoryidParam}, title:{titleParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title, 
        categoryidParam:vid.items[j].snippet.categoryId,
        descriptionParam:vid.items[j].snippet.description,
        created_timeParam:vid.items[j].snippet.publishedAt, 
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:vid.items[j].statistics.viewCount,
        likeCountParam:0, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
        console.log("success");
    }

});
      }
      else if(vid.items[j].statistics.viewCount == null){
           db.cypher({
    query: 'match (c:channel), (v:video), (u:user) where (v.source=c.channelid OR v.source=c.title) AND c.channelid={channel_id} AND u.title={userid} optional match (u:user)-[r:like]-(v) optional match (u:user)-[re:repick]-(v) RETURN v.title,v.id, v.created_time,v.likes,v.views,v.android_likes, v.android_views,v.from,c.title, c.channelid,c.channelthumbnail,c.followers, c.video_count,r.title ,re.title ORDER BY v.created_time LIMIT 200',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title,
        descriptionParam:vid.items[j].snippet.description, 
        categoryidParam:vid.items[j].snippet.categoryId,
        created_timeParam:vid.items[j].snippet.publishedAt,
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:0,
        likeCountParam:vid.items[j].statistics.likeCount, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
        console.log("success");
    }
});
      }
      else if(vid.items[j].statistics.viewCount == null && vid.items[j].statistics.likeCount == null){

db.cypher({
    query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, categoryid:{categoryidParam}, title:{titleParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title,
        descriptionParam:vid.items[j].snippet.description, 
        categoryidParam:vid.items[j].snippet.categoryId,
        created_timeParam:vid.items[j].snippet.publishedAt, 
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:0, 
        likeCountParam:0, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
         console.log("success");
    }
});

      }
      else{
      db.cypher({
    query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, title:{titleParam}, categoryid:{categoryidParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
        idParam:vid.items[j].id,
        titleParam:vid.items[j].snippet.title,
        descriptionParam:vid.items[j].snippet.description, 
        categoryidParam:vid.items[j].snippet.categoryId,
        created_timeParam:vid.items[j].snippet.publishedAt,
        channelParam:cron_channel[k].title, 
        lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
        viewCountParam:vid.items[j].statistics.viewCount, 
        likeCountParam:vid.items[j].statistics.likeCount, 
        video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
    if (err) throw err;
    var result = results[0];
    if (!result) {
        console.log('No user found.');
    } else {
        console.log("success");
    }
});
}
  session
  .run('match (a:video), (b:language) where a.language=b.title merge (b)-[r:video_lang{title:"video_lang"}]-(a) return a,b,r ')
  .then(function(result){
    console.log("success1");
    session.close();
  })
  .catch(function(err){
    console.log(err);
  });

  //relation creation
  session
  .run('match (a:channel), (b:video) where a.channelid={channelParam} AND b.source={channelParam} merge (a)-[r:rel_video{title:"yt"}]->(b) return a,b,r',{channelParam:cron_channel[k].title})
  .then(function(result){
    console.log("success2");
    session.close();
  })
  .catch(function(err){
    console.log(err);
  });
        })(j);
        }
                }
    })  
})(i);
}//first for
  }
})

       })(k);


}



  }); 
});

服务器端错误

{ Error: Connection was closed by server
    at Neo4jError.Error (native)
    at new Neo4jError (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\error.js:76:132)
    at newError (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\error.js:66:10)
    at NodeChannel._handleConnectionTerminated (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\internal\ch-node.js:336:41)
    at emitNone (events.js:91:20)
    at TLSSocket.emit (events.js:185:7)
    at endReadableNT (_stream_readable.js:974:12)
    at _combinedTickCallback (internal/process/next_tick.js:74:11)
    at process._tickCallback (internal/process/next_tick.js:98:9) code: 'ServiceUnavailable' }

数据库日志 调试.log

2017-05-18 16:09:16.849+0000 ERROR [o.n.b.t.SocketTransportHandler] Fatal error occurred when handling a client connection: failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216) failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216)
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:624)
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:578)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:709)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:698)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:262)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:170)
    at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
    at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1472)
    at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1482)
    at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:519)
    at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:490)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:779)
    at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:39)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1089)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:454)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
    at java.lang.Thread.run(Unknown Source)
2017-05-18 16:09:16.937+0000 WARN  [io.netty.channel.AbstractChannelHandlerContext] Failed to mark a promise as failure because it has succeeded already: DefaultChannelPromise@3c0b7552(success)

最佳答案

不是内存的问题,实际上是session没有正确关闭而产生的错误。

执行以下操作来解决该问题。 1.第一个 session 没有正确关闭,session.close()调用在返回后继续。代码可以更改为:

session
  .run('match (c:channel) where c.from="yt" return c')
  .then(function (result) {
    session.close(); // <<-- session is closed here
    return result.records.map(function (record) {
      return {
        title: record._fields[0].properties.channelid
      };
    })
  })
...
  • session 关闭后不应重复使用。请在最后两次 session.run(...) 调用之前添加 var session = driver.session() 。即之前:

    session.run('match (a:video), (b:language) where a.language=b.title merge (b)-[r:video_lang{title:"video_lang"}]-(a) return a,b,r')

  • 和之前:

    session.run('match (a:channel), (b:video) where a.channelid={channelParam} AND b.source={channelParam} merge (a)-[r:rel_video{title:"yt"}]->(b) return a,b,r',{channelParam:cron_channel[k].title})
    

    关于node.js - 我想在 Node js 中运行 cron 以在 Neo4j DB 中添加 youtube 视频数据,但在 cron 运行时出现错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44062286/

    相关文章:

    node.js - 在 Node.js 上的 Express POST 路由中定义/使用 Promise

    javascript - Node 中的 Highcharts

    node.js - 如何使用node js映射mongodb中的两个集合

    linux - 通过电子邮件正文中的日志获取 cronjob 脚本执行的日志

    node.js - 停止测试运​​行 mocha

    neo4j - AND 运算符不适用于 Neo4j 中的 type(r)

    python - 将数据从 CSV 加载到 Neo4j

    Neo4j 浏览器 – 刷新时查询丢失

    php - 构建 cron 作业调度程序

    linux - cronjob 等待问题