java - Netty WebSocket 客户端 channel 在 Linux 服务器上总是处于非 Activity 状态

标签 java linux websocket netty

我基于netty websocket client example构建了一个websocket客户端集成3rd party feed,代码与官方示例几乎相同,仅更改了URITextWebSocketFrame处理。

此客户端是 Spring Boot webapp 的一部分,它在我的 MacBook 或 Windows 7 PC 上运行良好,但是,一旦我将 war 部署到 Linux 服务器( Linux version 2.6.32-504.3.3.el6.x86_64 Red Hat 4.4.7-11), 应用启动时 channel 会直接去inactive。

查了下网络,JDK版本(1.8.x),Tomcat版本(8.5.16),Netty版本(4.1.13.Final),好像没问题,真糊涂。

WebSocketClientHandler.java:

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private final Logger logger = LoggerFactory.getLogger("feedlogger");

    private final WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        // always reaches here immediately on Linux
        logger.error("webSocket Client disconnected!");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
            logger.info("finishHandshake and webSocket Client Connected!");
            handshakeFuture.setSuccess();
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            logger.error("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
                + response.content().toString(CharsetUtil.UTF_8) + ')');
            return;
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            logger.info(textFrame.text());
            List<String> messages = FeedJsonUtils.deserializeAsyncMessage(textFrame.text());
            if (!CollectionUtils.isEmpty(messages)) {
                messages.stream().forEach(message -> {
                        FeedReceiver.onMessage(message);
                });
            }
        } else if (frame instanceof PongWebSocketFrame) {
            logger.error("WebSocket Client received pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            logger.error("WebSocket Client received closing");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    {
        logger.error("exception caught: ", cause);
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

WebSocketClientImpl.java:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;


@Component
public class WebsocketClientImpl {
    private final Logger logger = LoggerFactory.getLogger("feedlogger");

    private static final String WSS_URL = "wss://api.xxx.com/ws/?token=";

    private Channel ch = null;

    @Async
    public void connectWebSocket(String token) throws Exception {

        URI uri = new URI(WSS_URL + token);
        String scheme = uri.getScheme() == null ? "wss" : uri.getScheme();
        final String host = uri.getHost() == null ? "api.mollybet.com" : uri.getHost();

        final int port;
        if (uri.getPort() == -1) {
            if ("ws".equalsIgnoreCase(scheme)) {
                port = 80;
            } else if ("wss".equalsIgnoreCase(scheme)) {
                port = 443;
            } else {
                port = -1;
            }
        } else {
            port = uri.getPort();
        }

        if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
            logger.error("Only WS(S) is supported.");
            return;
        }

        // SSL handling
        final boolean ssl = "wss".equalsIgnoreCase(scheme);
        final SslContext sslCtx;
        if (ssl) {
            sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
        // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08
        // or V00.
        // If you change it to V00, ping is not supported and remember to
        // change
        // HttpResponseDecoder to WebSocketHttpResponseDecoder in the
        // pipeline.
        final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
                .newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));

        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                if (sslCtx != null) {
                    p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                }
                p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
                        WebSocketClientCompressionHandler.INSTANCE, handler);
                }
            });

            ch = b.connect(uri.getHost(), port).sync().channel();
            handler.handshakeFuture().sync();

            BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String msg = console.readLine();
                if (msg == null) {
                    break;
                } else if ("bye".equals(msg.toLowerCase())) {
                    ch.writeAndFlush(new CloseWebSocketFrame());
                    ch.closeFuture().sync();
                    break;
                } else if ("ping".equals(msg.toLowerCase())) {
                    WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
                ch.writeAndFlush(frame);
                } else {
                    WebSocketFrame frame = new TextWebSocketFrame(msg);
                    ch.writeAndFlush(frame);
                }
            }
        } finally {
            group.shutdownGracefully();
        }
    }

    public void send(String message) {
        if (ch == null) {
            logger.error("channel unavailable");
            return;
        }
        WebSocketFrame frame = new TextWebSocketFrame(message);
        ch.writeAndFlush(frame);
    }


}

FeedServiceImpl.java:(应用启动时自动连接websocket)

@Service
@PropertySource("classpath:api.properties")
public class FeedServiceImpl implements FeedService {
    private final static Logger logger = LoggerFactory.getLogger("feedlogger");

    @Autowired
    private Environment env;

    @Autowired
    private WebsocketClientImpl websocketClientImpl;

    @Override
    @PostConstruct
    public void connectWs() {
        String token = env.getProperty("ws.token", String.class);
        websocketClientImpl.connectWebSocket(token);
    }

}

最佳答案

最后我解决了这个问题,我的WebSocketClient类和netty websocket client example中的WebSocketClient几乎一样。 ,替换以下代码后:

BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
    String msg = console.readLine();
    if (msg == null) {
        break;
    } else if ("bye".equals(msg.toLowerCase())) {
        ch.writeAndFlush(new CloseWebSocketFrame());
        ch.closeFuture().sync();
        break;
    } else if ("ping".equals(msg.toLowerCase())) {
        WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
        ch.writeAndFlush(frame);
    } else {
        WebSocketFrame frame = new TextWebSocketFrame(msg);
        ch.writeAndFlush(frame);
    }
}

与:

Thread.sleep(Integer.MAX_VALUE);

现在 websocket 连接恢复正常了。

关于java - Netty WebSocket 客户端 channel 在 Linux 服务器上总是处于非 Activity 状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45139443/

相关文章:

java - 如何计算集合/索引中的所有 token 计数

java - Android Studio 无法启动(JDK 而不是 JRE)

linux - 如何在c++中不按Enter键读取字符

ssl - https 的 WSS 连接失败

javascript - 是否可以限制 socket.io 每个事件只有一个监听器?

ruby - Faye ruby​​ 客户端只发布一次

java - 在 Eclipse 中找不到 "Package Explorer" View

java - Selenium WebDriver - getCssValue() 方法

c++ - 如何使用 WINE 在 Linux 上使用 Windows .lib

c - Linux何时/如何将共享库加载到地址空间中?