app.js
var express = require('express')
, neo4j_match = require('neo4j')
, session = require('express-session')
, bodyParser = require('body-parser')
, moment = require('moment')
, request = require('request')
, app = express();
var every = require('every-moment');
var neo4j = require('neo4j-driver').v1;
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({extended: true}));
var db = new neo4j_match.GraphDatabase('http://Novasys:1234567@localhost:7474');
var driver = neo4j.driver("bolt://localhost:7687", neo4j.auth.basic("Novasys", "1234567"));
var session = driver.session();
every(40, 'seconds', function() {
session
.run('match (c:channel) where c.from="yt" return c')
.then(function(result){
return result.records.map(function(record){
return {
title: record._fields[0].properties.channelid
};
})
session.close();
})
.then(function(cron_channel){
for (var k in cron_channel){
(function(k){
console.log(cron_channel[k].title);
request('https://www.googleapis.com/youtube/v3/search?part=snippet&maxResults=50&channelId='+cron_channel[k].title+'&order=date&key=AIzaSyBj7mztzfuZq_LA_5TYHLAw-Qaa2Fof3JU', function (error, response, body) {
if (!error && response.statusCode == 200) {
body=JSON.parse(body);
for(var i in body.items){
(function(i){
//selecting video id and getting their meta data
request('https://www.googleapis.com/youtube/v3/videos?part=statistics,snippet,contentDetails&id='+body.items[i].id.videoId+'&key=AIzaSyBj7mztzfuZq_LA_5TYHLAw-Qaa2Fof3JU',function(err, res, vid){
if (!error && res.statusCode == 200){
vid=JSON.parse(vid);
for(var j in vid.items){
(function(j){
//node creation
if(vid.items[j].statistics.likeCount == null){
db.cypher({
query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, categoryid:{categoryidParam}, title:{titleParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
idParam:vid.items[j].id,
titleParam:vid.items[j].snippet.title,
categoryidParam:vid.items[j].snippet.categoryId,
descriptionParam:vid.items[j].snippet.description,
created_timeParam:vid.items[j].snippet.publishedAt,
channelParam:cron_channel[k].title,
lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
viewCountParam:vid.items[j].statistics.viewCount,
likeCountParam:0,
video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
if (err) throw err;
var result = results[0];
if (!result) {
console.log('No user found.');
} else {
console.log("success");
}
});
}
else if(vid.items[j].statistics.viewCount == null){
db.cypher({
query: 'match (c:channel), (v:video), (u:user) where (v.source=c.channelid OR v.source=c.title) AND c.channelid={channel_id} AND u.title={userid} optional match (u:user)-[r:like]-(v) optional match (u:user)-[re:repick]-(v) RETURN v.title,v.id, v.created_time,v.likes,v.views,v.android_likes, v.android_views,v.from,c.title, c.channelid,c.channelthumbnail,c.followers, c.video_count,r.title ,re.title ORDER BY v.created_time LIMIT 200',
params: {
idParam:vid.items[j].id,
titleParam:vid.items[j].snippet.title,
descriptionParam:vid.items[j].snippet.description,
categoryidParam:vid.items[j].snippet.categoryId,
created_timeParam:vid.items[j].snippet.publishedAt,
channelParam:cron_channel[k].title,
lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
viewCountParam:0,
likeCountParam:vid.items[j].statistics.likeCount,
video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
if (err) throw err;
var result = results[0];
if (!result) {
console.log('No user found.');
} else {
console.log("success");
}
});
}
else if(vid.items[j].statistics.viewCount == null && vid.items[j].statistics.likeCount == null){
db.cypher({
query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, categoryid:{categoryidParam}, title:{titleParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
idParam:vid.items[j].id,
titleParam:vid.items[j].snippet.title,
descriptionParam:vid.items[j].snippet.description,
categoryidParam:vid.items[j].snippet.categoryId,
created_timeParam:vid.items[j].snippet.publishedAt,
channelParam:cron_channel[k].title,
lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
viewCountParam:0,
likeCountParam:0,
video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
if (err) throw err;
var result = results[0];
if (!result) {
console.log('No user found.');
} else {
console.log("success");
}
});
}
else{
db.cypher({
query: 'match (a:channel) where a.channelid={channelParam} merge (n:video {id:{idParam}, title:{titleParam}, categoryid:{categoryidParam}, description:{descriptionParam}, created_time:{created_timeParam},category:a.category, source:{channelParam}, length:{lengthParam}, video_thumbnail:{video_thumbnailParam}, from:"yt"}) ON CREATE SET n.language=a.language, n.views={viewCountParam}, n.likes={likeCountParam}, n.android_views=0, n.android_likes=0, n.created_at=timestamp() ON MATCH SET n.views={viewCountParam}, n.likes={likeCountParam} RETURN n',
params: {
idParam:vid.items[j].id,
titleParam:vid.items[j].snippet.title,
descriptionParam:vid.items[j].snippet.description,
categoryidParam:vid.items[j].snippet.categoryId,
created_timeParam:vid.items[j].snippet.publishedAt,
channelParam:cron_channel[k].title,
lengthParam:moment.duration(vid.items[j].contentDetails.duration).asMilliseconds(),
viewCountParam:vid.items[j].statistics.viewCount,
likeCountParam:vid.items[j].statistics.likeCount,
video_thumbnailParam:vid.items[j].snippet.thumbnails.high.url
},
},
function (err, results) {
if (err) throw err;
var result = results[0];
if (!result) {
console.log('No user found.');
} else {
console.log("success");
}
});
}
session
.run('match (a:video), (b:language) where a.language=b.title merge (b)-[r:video_lang{title:"video_lang"}]-(a) return a,b,r ')
.then(function(result){
console.log("success1");
session.close();
})
.catch(function(err){
console.log(err);
});
//relation creation
session
.run('match (a:channel), (b:video) where a.channelid={channelParam} AND b.source={channelParam} merge (a)-[r:rel_video{title:"yt"}]->(b) return a,b,r',{channelParam:cron_channel[k].title})
.then(function(result){
console.log("success2");
session.close();
})
.catch(function(err){
console.log(err);
});
})(j);
}
}
})
})(i);
}//first for
}
})
})(k);
}
});
});
服务器端错误
{ Error: Connection was closed by server
at Neo4jError.Error (native)
at new Neo4jError (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\error.js:76:132)
at newError (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\error.js:66:10)
at NodeChannel._handleConnectionTerminated (C:\Users\Prateek\Desktop\Youtube Cron\node_modules\neo4j-driver\lib\v1\internal\ch-node.js:336:41)
at emitNone (events.js:91:20)
at TLSSocket.emit (events.js:185:7)
at endReadableNT (_stream_readable.js:974:12)
at _combinedTickCallback (internal/process/next_tick.js:74:11)
at process._tickCallback (internal/process/next_tick.js:98:9) code: 'ServiceUnavailable' }
数据库日志 调试.log
2017-05-18 16:09:16.849+0000 ERROR [o.n.b.t.SocketTransportHandler] Fatal error occurred when handling a client connection: failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216) failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216)
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 922746887, max: 934281216)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:624)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:578)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:709)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:698)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:262)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:170)
at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1472)
at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1482)
at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:519)
at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:490)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:787)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:779)
at io.netty.channel.AbstractChannelHandlerContext.access$1500(AbstractChannelHandlerContext.java:39)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1089)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:454)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at java.lang.Thread.run(Unknown Source)
2017-05-18 16:09:16.937+0000 WARN [io.netty.channel.AbstractChannelHandlerContext] Failed to mark a promise as failure because it has succeeded already: DefaultChannelPromise@3c0b7552(success)
最佳答案
不是内存的问题,实际上是session没有正确关闭而产生的错误。
执行以下操作来解决该问题。 1.第一个 session 没有正确关闭,session.close()调用在返回后继续。代码可以更改为:
session
.run('match (c:channel) where c.from="yt" return c')
.then(function (result) {
session.close(); // <<-- session is closed here
return result.records.map(function (record) {
return {
title: record._fields[0].properties.channelid
};
})
})
...
session 关闭后不应重复使用。请在最后两次 session.run(...) 调用之前添加 var session = driver.session() 。即之前:
session.run('match (a:video), (b:language) where a.language=b.title merge (b)-[r:video_lang{title:"video_lang"}]-(a) return a,b,r')
和之前:
session.run('match (a:channel), (b:video) where a.channelid={channelParam} AND b.source={channelParam} merge (a)-[r:rel_video{title:"yt"}]->(b) return a,b,r',{channelParam:cron_channel[k].title})
关于node.js - 我想在 Node js 中运行 cron 以在 Neo4j DB 中添加 youtube 视频数据,但在 cron 运行时出现错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44062286/