kotlin - 使用 Kotlin 的 Beam DoFn 中的泛型和方差

标签 kotlin apache-beam

我正在使用 Apache Beam 和 Kotlin 构建一个简单的 ETL 管道,我正在尝试创建一种 Either类型:

@DefaultCoder(SerializableCoder::class)
sealed class Result<out T> : Serializable {

    class Valid<out T>(val value: T) : Result<T>()

    class Invalid(val reason: String): Result<Nothing>()
}

例如,将像这样使用它:
abstract class MapFunction<T, U> : DoFn<Result<T>, Result<U>>() {

    abstract fun map(item: T) : Result<U>

    @ProcessElement
    fun processElement(context: ProcessContext) {
        val input = context.element()

        val output = when (input) {
            is Result.Valid -> map(input.value)
            is Result.Invalid -> input
        }

        context.output(output)
    }
}

通过这种方式,我可以跟踪所有无效结果并在管道结束时输出它们。问题是,当我构建一个基本的管道时,我遇到了以下神秘错误(为便于阅读而格式化):
Exception in thread "main" java.lang.IllegalArgumentException: 
nl.sanderp.beam.functions.LyricsEnricher, 
@ProcessElement processElement(ProcessContext), 
@ProcessElement processElement(ProcessContext), 
parameter of type DoFn<Result<Song>, Result<Song>>.ProcessContext at index 0: 
ProcessContext argument must have type DoFn<Result<? extends Song>, Result<? extends Song>>.ProcessContext

我尝试使用 @JvmWildcard提到的注释in the Kotlin docs ,但这只会导致更多错误:
Exception in thread "main" java.lang.ClassCastException: org.apache.beam.sdk.repackaged.com.google.common.reflect.Types$WildcardTypeImpl cannot be cast to java.lang.reflect.TypeVariable

如果可能的话,我似乎无法弄清楚如何让这个片段工作。

作为引用,以下是上述代码的调用方式:
class LyricsEnricher(private val resourceFactory: ResourceFactory) : MapFunction<Song, Song>() {

    private lateinit var dao: LyricsDao

    @Setup
    fun setup() {
        dao = resourceFactory.get(ResourceFactory.Key.LYRICS_DAO)
    }

    override fun map(item: Song): Result<Song> {
        dao.findLyrics(title = item.title, artist = item.artist)?.also {
            return Result.Valid(item.copy(lyrics = it))
        }

        return Result.Invalid("No lyrics found")
    }
}

最佳答案

我认为 Invalid 类的类型在与 out 一起使用时不正确像这样。

您的光束图函数的类型为 MapFunction<T, Result<U>>但您没有指定 U任何地方,除非在 Sealed 类内部,它变成了其中之一(双关语)TNothing .

如果您更改 Invalid<Nothing> 的类型至Invalid<T>从而使您的 map 类型为MapFunction<T, Result<T>>梁似乎可以接受。

关于kotlin - 使用 Kotlin 的 Beam DoFn 中的泛型和方差,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49119164/

相关文章:

kotlin - Room 为我所有挂起的 Dao 函数返回 java.lang.Object

unit-testing - 100% 代码覆盖率;数据类模型的单元测试

kotlin - 无法更改 mutableListOf( ) 值

java - Apache Beam java 测试 ExpectedLogs Maven

java - 输出空值的 TableRow 时出现 NullPointerException

google-cloud-storage - 使用 Google 数据流转换的 Google 云存储中的 Avro 与 Parquet

android - AutoCompleteTextView onSubmit 事件

android - 如何在Kotlin中使用find扩展功能并将其索引存储在lambda中

python - 由于需求文件而无法部署数据流模板

google-cloud-dataflow - Google 数据流 GroupByKey 可以处理热键吗?