java - 正常的代码在另一台机器上运行时出现java.lang.NullPointerException

标签 java apache-flink

我编写了一段代码来消费来自 Kafka 的流,然后将其接收到 MySQL。

代码在我的IDE上运行正常,并且可以按预期向目标表中插入数据。

但是,当我将 jar 提交到 Flink(部署在 192.168.95.2 上)网页时,它在 preparedStatement = connection.prepareStatement(sql); 上抛出 NullPointerException > 声明。这部分的其他代码是:

private static PreparedStatement preparedStatement;
sql = "insert into kafka_ccu values(?,?)";

我很困惑...如果连接或访问有任何问题,为什么代码可以在我的IDE上正常运行?任何帮助表示赞赏。

完整的异常(exception)是:

java.lang.NullPointerException
    at DemoKafka2Mysql$2.invoke(DemoKafka2Mysql.java:92)
    at DemoKafka2Mysql$2.invoke(DemoKafka2Mysql.java:72)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

我的代码(流源不是 Kinesis,但同样的问题...):

public class DemoKinesis2Mysql {

private static  Connection connection;
private static PreparedStatement preparedStatement;
private static String sql = "";
private static String username;
private static String password;
private static String drivername;
private static String dburl;

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    see.setParallelism(1);
    getConn(); 

    Properties consumerConfig = new Properties();
    consumerConfig.put(AWSConfigConstants.AWS_REGION, "aaa");
    consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "ddd");
    consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx");
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

    DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
            "target_stream",
            new SimpleStringSchema(),
            consumerConfig));

    kinesis.print();

    DataStream<JSONObject> mapStream = kinesis.map(new MapFunction<String, JSONObject>() {
        public JSONObject map(String s) throws Exception {
            JSONObject jsonObject = JSON.parseObject(s);
            return jsonObject;
        }
    }).filter(new FilterFunction<JSONObject>() {
        public boolean filter(JSONObject jsonObject) throws Exception {
            if (jsonObject.containsKey("Body")) {
                return true;
            }
            return false;
        }
    });


    mapStream.addSink(new RichSinkFunction<JSONObject>() {
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
        }

        @Override
        public void close() throws Exception {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        }


        public void invoke(JSONObject value, Context context) throws Exception  {
            JSONArray arr = JSON.parseObject(value.getString("Body")).getJSONArray("metrics");
            log.info("json array:" + arr);
            for (int i=0;i<arr.size();i++) {
                JSONObject o = arr.getJSONObject(i);
                preparedStatement = connection.prepareStatement(sql);
                String k = "";
                if (o.containsKey("key")) {
                    k = o.getString("key");
                }
                String d = o.getJSONObject("properties").getString("datetime");
                if (k.equals("ccu")) {
                    String t = o.getJSONObject("properties").getString("ccu");
                    preparedStatement.setObject(1, t);
                    preparedStatement.setObject(2, d);
                    preparedStatement.executeUpdate();
                    }
                }
            }

    });
    see.execute();
}

private static void getConn() throws SQLException {
    username = "user";
    password = "123456";
    drivername = "com.mysql.jdbc.Driver";
    dburl = "jdbc:mysql://192.168.95.2:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
    sql = "insert into tmp_stream_kinesis_ccu (ccu,data_time) values(?,?)";
    try {
        Class.forName(drivername);
    }catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
    connection = DriverManager.getConnection(dburl, username, password);
}

}

最佳答案

试试这个:

if (connection == null || connection.isClosed()) return;
PreparedStatement statement = connection.prepareStatement(sql);

关于java - 正常的代码在另一台机器上运行时出现java.lang.NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60518944/

相关文章:

scala - Apache Flink:使用keyBy/connect维护流中的消息输入顺序

apache-flink - Flink Jdbc 接收器

java - Apache Flink 与 Elasticsearch 集成

java - 在 freemarker 模板中获取 contextPath 来链接其他 servlet

java - 将 JDBC 获取的 Date 数据作为 XML 转换为 Date 对象

java - 从仅给定一个属性的 ArrayList 中删除一个对象

scala - 无法从 JAR 文件构建程序

java - 是否可以附加 2 个富文本字符串?

java - 有什么方法可以分析截断的 Java 堆转储(hprof 文件)?

java - Confluence JDBC 连接器和 Flink 消费者