scala - 无法使用flink在Scala中实例化用户功能

标签 scala docker apache-flink

我正在尝试运行一个用Scala编写的大型开源项目。我正在使用3个docker镜像运行它。

docker-compose文件为here

version: '2'
services:
  streammachine:
    environment:
      - "EXECUTION_TYPE=flink-cluster"
      - "FLINK_JOBMGR_HOST=jobmanager"
      - "FLINK_JOBMGR_PORT=8081"
      - "FLINK_MONITORING_HOST=jobmanager"
      - "FLINK_MONITORING_PORT=8081"
      - "JOB_MANAGER_RPC_ADDRESS=jobmanager"
      - "TSP_JAVA_OPTS=-Xms2G -Xmx4G" 
    restart: on-failure
    image: clovergrp/tsp:latest
    ports:
      - "8080:8080"
  jobmanager:
    image: flink:1.7.2-scala_2.12-alpine
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.2-scala_2.12-alpine
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

该项目本身使用Scala 2.12.7。它取自Build.sbt

但是,当我使用项目的功能时,遇到了Cannot instantiate user function错误。
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:369)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException: scala.Symbol; local class incompatible: stream classdesc serialVersionUID = 6865603221856321286, local class serialVersionUID = 2966401305346518859
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

最佳答案

docker-compose中的hange flink版本和build.sbt中的scala版本

关于scala - 无法使用flink在Scala中实例化用户功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57653933/

相关文章:

postgresql - 简单数据库插入上的 Squeryl NullPointerException?

docker - 利用docker进行开发:一个用例

docker - 如何安装 libcontainer docker 代替 lxc-docker

java - Flink 集群上的 Kafka jaas 验证失败

hadoop - 如何将 Flink 作业提交到远程 YARN 集群?

scala - Spark 如何将分区和按标签分区一起进行排序

json - 如何在 Play 框架 2.3.x (Scala) 中将案例类转换为 JSON?

scala - Kafka 流与 Scala

java - Docker Swarm 上具有多线程的分布式 Java 应用程序

java - 命名 Flink 操作符