java - 如何修复 "incompatible types: org.apache.beam.sdk.options.ValueProvider<java.lang.String> cannot be converted to java.lang.String"

标签 java apache-kafka google-cloud-dataflow apache-beam

我关注了this link创建一个模板,该模板构建一个梁管道以从 KafkaIO 读取。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。正是“.withBootstrapServers(options.getKafkaServer())”行导致了错误。 Beam 版本是 2.9.0,这是我的代码的一部分。

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();

    void setKafkaServer(ValueProvider<String> value);

    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata() 
    )

以下是我运行代码的方式:

mvn compile exec:java \
-Dexec.mainClass=${MyClass} \
-Pdataflow-runner -Dexec.args=" \
--project=${MyClass} \
--stagingLocation=gs://${MyBucket}/staging \
--tempLocation=gs://${MyBucket}/temp \
--templateLocation=gs://${MyBucket}/templates/${MyClass} \
--runner=DataflowRunner"

最佳答案

为了通过 ValueProvider 访问值,您需要使用get方法,然后您可以获取其具体类型的值。

例如: 当有选择时:

ValueProvider<String> getKafkaServer();

您可以通过以下方式访问它:

getKafkaServer().get()这将返回 String 对象。

似乎 KafkaIo Api 需要获取字符串参数而不是 ValueProvider,您必须从 ValueProvider 包装器中提取值。

关于java - 如何修复 "incompatible types: org.apache.beam.sdk.options.ValueProvider<java.lang.String> cannot be converted to java.lang.String",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54510181/

相关文章:

google-cloud-platform - 使用 Google DataFlow/Apache Beam 并行化图像处理或抓取任务是否有意义?

java - 无法定位元素

java - 回复kafka模板连接 header (CorrelationId)未发送到Google pub sub

scala - 如何在 Scala 中编写 Kafka Producer

java - 如何访问谷歌云数据流中压缩源的每个条目?并获取每个子文件的Byte[]

google-cloud-platform - 如何使环境变量作为python sdk中的环境变量到达Dataflow worker

java - 当客户端在 netty 中连接或断开连接时如何得到通知?

java - 在不重新启动应用程序服务器或在运行时重新初始化 spring bean

java - 有没有办法警告其他开发人员有关功能的信息?

apache-kafka - 将消息从一个 Kafka 集群传输到另一个集群