我正在尝试在 flink 中执行 pagerank Basic 示例,并稍作修改(仅在读取输入文件时,其他一切都相同)我收到错误,因为 Task not serializable 和以下是输出错误的一部分
atorg.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171)
下面是我的代码
object hpdb {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val maxIterations = 10000
val DAMPENING_FACTOR: Double = 0.85
val EPSILON: Double = 0.0001
val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"
val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target
val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid
val noOfPages = pages.count()
val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))
val adjacencyLists = links
// initialize lists ._1 is the source id and ._2 is the traget id
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
// **//the output shows error here**
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
//**//the output shows error here**
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count()))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outpath, "\n", " ")
env.execute()
}
}
任何在正确方向上的帮助都受到高度赞赏?谢谢你。
最佳答案
问题是您从 DataSet
中引用了 pages
MapFunction
。这是不可能的,因为 DataSet
只是数据流的逻辑表示,不能在运行时访问。
要解决此问题,您需要做的是将 val pagesCount = pages.count
值分配给变量 pagesCount
并在您的 MapFunction
中引用该变量。pages.count
实际做的,就是触发数据流图的执行,这样可以统计pages
中的元素个数。然后将结果返回到您的程序。
关于scala - 任务不可序列化 Flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31315478/