java - 为什么会发生 JsonParseException ?

标签 java spring spring-integration

该消息是在queueChannel尝试发送到特定IP时发出的。

适配器是...

@Bean
public AbstractConnectionFactory plainClientFactory() {
    int port = Integer.parseInt(outboundPort);
    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(outboundHost, port);
    factory.setTaskExecutor(taskSchedulerWithPlain());
    factory.setLookupHost(false);

    factory.setSerializer(echoSerializer);
    factory.setDeserializer(echoSerializer);

    // Nagle's algorithm disabled
    factory.setSoTcpNoDelay(true);

    return factory;
}

@Bean
public TcpSendingMessageHandler plainClientHandler() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(plainClientFactory());

    return handler;
}

@Bean
public Executor taskSchedulerWithPlain() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setPoolSize(PROCESS_SIZE);
    scheduler.setThreadFactory(plainAffinityThreadFactory());
    scheduler.setWaitForTasksToCompleteOnShutdown(true);

    return scheduler;
}

配置是...

@Resource(name = "sslClientHandler")
TcpSendingMessageHandler sslClientHandler;

@Bean
public IntegrationFlow flowForConvertingSslJsonToBytesAndSendClient() {
    return IntegrationFlows.from(outputWithSslJsonBytesToClient())
            .transform(new ObjectToJsonTransformer())
            .transform(new PayloadSerializingTransformer())
            .handle(INBOUND_SERVICE, ATTACH_HEADER).handle(sslClientHandler).get();
}

@Bean
public MessageChannel outputWithSslJsonBytesToClient() {
    return MessageChannels.queue(POOL_SIZE).get();
}

InboundServiceImpl 是...

@Override
public Object extractPayloadAsJson(byte[] message) throws Exception {
    log.debug("receive bytes... {} bytes", message.length);

    StringBuffer sb = new StringBuffer();
    for (byte b : message) {
        sb.append(b);
    }

    log.debug("extractPayloadAsJson message to string : {}", sb.toString());

    int payloadSize = message.length - EXPECTED_HEADER_SIZE;

    byte[] payload = new byte[payloadSize];
    byte[] header = new byte[EXPECTED_HEADER_SIZE];

    System.arraycopy(message, EXPECTED_HEADER_SIZE, payload, 0, payloadSize);
    log.debug("extract json... {} bytes", payload.length);

    System.arraycopy(message, 0, header, 0, EXPECTED_HEADER_SIZE);

    ObjectMapper mapper = new ObjectMapper();

    HashMap<String, Object> payloadAsMap = mapper.readValue(payload, HashMap.class);
    log.debug("convert map... {}", payloadAsMap.entrySet());

    return payloadAsMap;
}


@Override
public byte[] attachHeader(byte[] payload) throws Exception {
    byte[] jsonFlag = ByteBuffer.allocate(HEADER_SIZE_JSON).putShort((short) 0).array();
    byte[] crcWithoutJsonFlag = new byte[HEADER_SIZE_CRC_WITHOUT_JSON_FLAG];
    byte[] rcmd = new byte[HEADER_SIZE_RCMD];
    byte[] packetSize = ByteBuffer.allocate(HEADER_PACKET_SIZE).putInt(payload.length).array();

    byte[] concat = Bytes.concat(jsonFlag, crcWithoutJsonFlag, rcmd, packetSize, payload);

    log.debug("concat {} bytes", concat.length);

    StringBuffer sb = new StringBuffer();
    for (byte b : concat) {
        sb.append(b);
    }

    log.debug("concat to string : {}", sb.toString());

    return concat;
}

但是,当您尝试传输数据时会发生错误...
为什么会出现这个错误?

ERROR 5432 --- [task-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2]
<小时/>
DEBUG 5432 --- [task-scheduler-5] o.s.integration.channel.QueueChannel     : postReceive on channel 'inputWithPlainJson', message: GenericMessage [payload=byte[201], headers={ip_address=192.168.5.97, id=eb9a314a-b767-6ad6-137f-3c4a2bb90f15, ip_hostname=192.168.5.97, ip_tcp_remotePort=49811, ip_connectionId=192.168.5.97:49811:5001:676813de-72f7-46ea-92fc-a902dc7bcb39, timestamp=1466471813736}]
DEBUG 5432 --- [task-scheduler-5] o.s.i.endpoint.PollingConsumer           : Poll resulted in Message: GenericMessage [payload=byte[201], headers={ip_address=192.168.5.97, id=eb9a314a-b767-6ad6-137f-3c4a2bb90f15, ip_hostname=192.168.5.97, ip_tcp_remotePort=49811, ip_connectionId=192.168.5.97:49811:5001:676813de-72f7-46ea-92fc-a902dc7bcb39, timestamp=1466471813736}]
DEBUG 5432 --- [task-scheduler-5] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@1da1de6] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: GenericMessage [payload=byte[201], headers={ip_address=192.168.5.97, id=eb9a314a-b767-6ad6-137f-3c4a2bb90f15, ip_hostname=192.168.5.97, ip_tcp_remotePort=49811, ip_connectionId=192.168.5.97:49811:5001:676813de-72f7-46ea-92fc-a902dc7bcb39, timestamp=1466471813736}]
DEBUG 5432 --- [task-scheduler-5] c.m.j.e.s.service.InboundServiceImpl     : receive bytes... 201 bytes
DEBUG 5432 --- [task-scheduler-5] c.m.j.e.s.service.InboundServiceImpl     : extractPayloadAsJson message to string : SUCCESS
DEBUG 5432 --- [task-scheduler-5] c.m.j.e.s.service.InboundServiceImpl     : extract json... 189 bytes
DEBUG 5432 --- [task-scheduler-5] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2], headers={id=872c4b3f-f2b4-3213-41c8-71e65f983390, timestamp=1466471815642}]
DEBUG 5432 --- [task-scheduler-5] o.s.integration.handler.LoggingHandler   : (inner bean)#41bec956 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2], headers={id=872c4b3f-f2b4-3213-41c8-71e65f983390, timestamp=1466471815642}]
ERROR 5432 --- [task-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
at org.springframework.integration.dsl.support.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:57)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

JSON 数据是...

{"result":"success","reason":0,"pushTargetList":["dhjung","hiryu","hjoh","hslee","hslee1","jhbae","jslee1","khhwang","permedia","test_uid","wjchoi1","ysahn"],"response":"pushTarget"}

标题是...

{json__ContentTypeId__=class java.lang.String, json__TypeId__=class java.util.HashMap, ip_tcp_remotePort=61036, ip_connectionId=192.168.3.96:61036:5001:e43b15c1-14b8-4694-b7ff-3bd8d5bb9379, ip_address=192.168.3.96, id=ad405ee4-f858-d7f0-ba7d-8a7ae25cc8a5, json__KeyTypeId__=class java.lang.String, contentType=application/json, ip_hostname=192.168.3.96, timestamp=1466476432549}

最佳答案

为什么在将有效负载转换为 JSON 后对其应用 Java 序列化? ...

.transform(new PayloadSerializingTransformer()) 

...解码时您没有执行相反的操作。

172 是 0xac - 这是 java 序列化对象的第一个“神奇” header 字节。

为什么要进行所有 header 操作,如果将有效负载转换为包含 header 的列表,JSON(反)序列化也会处理 header 。

关于java - 为什么会发生 JsonParseException ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37934251/

相关文章:

java - Service层应该写什么方法?

java - Spring 按类顺序对 bean 进行排序

java - 如何以一定的速率消费者QueueChannel?

java - 从java中的任意字符获取VK int

java - 具有多级连接的 Hibernate Criteria

java - Logback 创建日志文件但不写入任何内容

java - Spring 集成 Kafka 和管理偏移量

java - 使用 Spring Integration 将消息发送到套接字端口并接收响应

java - 在 ServerSocket 和 ServerSocketChannel 中接受和绑定(bind)

java - 如何让我的 JWindow 窗口始终保持焦点