scala - Flink 获取 RuntimeContext

标签 scala apache-flink illegalstateexception

在我的 Flink 代码中,我使用自定义输入格式,这会引发异常。看来我需要一个 RuntimeContext 实例,但如何才能获得一个实例?

我的格式类如下所示:

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
    super.open(split)
    lineCounter = new IntCounter()
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException

我的主程序如下所示:

val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception

抛出的异常:

java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
    at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)

我的类需要一个RuntimeContext,因为它扩展了DelimitedInputFormat,而DelimitedInputFormat扩展了... RichInputFormat

public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
    private transient RuntimeContext runtimeContext;
    public void setRuntimeContext(RuntimeContext t)
    public RuntimeContext getRuntimeContext()

因此,RichInputFormat 的任何实例都希望我们在创建后setRuntimeContext(RuntimeContext t)

我希望我应该执行以下操作:

val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception

但是如何获取 RuntimeContext 的实例呢? 抛出异常是因为我的自定义输入格式没有 RuntimeContext。我想设置一个,但我不知道从哪里得到它。

最佳答案

您应该在生命周期方法中初始化 RuntimeContext,例如 open

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {

override def  openInputFormat() = {
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}

关于scala - Flink 获取 RuntimeContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49393596/

相关文章:

mysql - 使用Scala JDBC进行大数据上传

log4j - 如何更改Flink的日志目录

akka - 从代码中取消 Apache Flink 作业

apache-flink - 时态表函数和版本化表有什么区别

android - 引用已销毁的 Activity,即使使用了 WeakReference

scala - 如何使用 Scala 创建具有随机内容的大型 Spark 数据框?

linux - 从安装 Zeppelin 的本地计算机到 Docker Spark 集群的连接

java.lang.IllegalStateException : You need to use a Theme. AppCompat 主题(或后代)与此 Activity

performance - 为什么scalac在某些场景下不能优化尾递归?

android - IllegalStateException:没有 Activity FragmentManager.java