在我的 Flink 代码中,我使用自定义输入格式,这会引发异常。看来我需要一个 RuntimeContext
实例,但如何才能获得一个实例?
我的格式类如下所示:
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
super.open(split)
lineCounter = new IntCounter()
getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException
我的主程序如下所示:
val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception
抛出的异常:
java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)
我的类需要一个RuntimeContext
,因为它扩展了DelimitedInputFormat
,而DelimitedInputFormat
扩展了... RichInputFormat
public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
private transient RuntimeContext runtimeContext;
public void setRuntimeContext(RuntimeContext t)
public RuntimeContext getRuntimeContext()
因此,RichInputFormat
的任何实例都希望我们在创建后setRuntimeContext(RuntimeContext t)
。
我希望我应该执行以下操作:
val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception
但是如何获取 RuntimeContext 的实例呢?
抛出异常是因为我的自定义输入格式没有 RuntimeContext
。我想设置一个,但我不知道从哪里得到它。
最佳答案
您应该在生命周期方法中初始化 RuntimeContext,例如 open
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {
override def openInputFormat() = {
getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}
关于scala - Flink 获取 RuntimeContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49393596/