java - Netty 代理复制流量

标签 java proxy netty nio

我正在探索一种将流量从网络代理复制到两台服务器的方法。 即代替一般实现:

Server1 -> Proxy -> Server 2

我想执行以下操作:

Server 1 -> Proxy -> Server 2 and Server 3

Server3->Proxy is dropped

因此,每条消息都会发送到服务器 2 和服务器 3。

我只有一个限制,即代理和服务器 2 之间的通信不应因为服务器 3 而被阻止(如果服务器 3 速度较慢等)。

我从以下代码开始:https://github.com/dawnbreaks/TcpProxy

不幸的是,我对 netty 不太熟悉,但其实现对于我的目的来说似乎是非常理想的。我想了解一下:

  1. 如何为服务器 3 创建新 channel
  2. 要重写哪个 API 才能与服务器 3 进行通信
  3. 如何从服务器 3 读取和删除消息

最佳答案

在 IRC#netty 中看到了您的聊天。

这里有几件事。您的代理需要有一个与服务器 1 连接的服务器端。然后服务器 2 和服务器 3 需要排除来自代理的连接,或者您可以使用 UDP(视情况而定)从代理接收数据。

Netty 有一个代理服务器的示例。这适用于您的情况,并且对于第三部分来说非常容易。简而言之,您将使用现有示例并打开一个到服务器 3 的新连接。现在您可以做的是从代理获取两个 channel (到服务器 2 和 3 的客户端连接)将它们放入一个 channel 组中并写入一个是时候使用两台服务器了!我编辑的示例代码将...允许通过代理从服务器 1 到服务器 2 相互通信,并允许相互通话,而服务器 3 只能接收数据,但如果服务器 3 回复代理,则代理不会执行任何操作。您可能想要添加一个处理程序来释放缓冲区或处理不应该从服务器 3 写回的数据。另外,从这里开始,这应该可以帮助您入门,但请查看 netty 文档、api、示例和 ppts,它们非常有帮助!

我将附上一些修改后的代码向您展示,这里是示例的链接。

Netty Proxy Server Examples

因此,对于示例,您将编辑 HexDumpProxyFrontendHandler.class,并为服务器 3 的新客户端添加第二个 Bootstrap。

当前代码

41      @Override
42      public void channelActive(ChannelHandlerContext ctx) {
43          final Channel inboundChannel = ctx.channel();
44  
45          // Start the connection attempt.
46          Bootstrap b = new Bootstrap();
47          b.group(inboundChannel.eventLoop())
48           .channel(ctx.channel().getClass())
49           .handler(new HexDumpProxyBackendHandler(inboundChannel))
50           .option(ChannelOption.AUTO_READ, false);
51          ChannelFuture f = b.connect(remoteHost, remotePort);
52          outboundChannel = f.channel();
53          f.addListener(new ChannelFutureListener() {
54              @Override
55              public void operationComplete(ChannelFuture future) {
56                  if (future.isSuccess()) {
57                      // connection complete start to read first data
58                      inboundChannel.read();
59                  } else {
60                      // Close the connection if the connection attempt has failed.
61                      inboundChannel.close();
62                  }
63              }
64          });
65      }

编辑代码

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.example.proxy;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    // As we use inboundChannel.eventLoop() when buildling the Bootstrap this does not need to be volatile as
    // the server2OutboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
    private Channel server2OutboundChannel;
    private Channel server3OutboundChannel;

    // TODO You should change this to your own executor
    private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt to SERVER 3
        Bootstrap server3Bootstrap = new Bootstrap();
        server3Bootstrap.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                // You are only writing traffic to server 3 so you do not need to have a handler for the inbound traffic
                .handler(new DiscardServerHandler()) // EDIT
                .option(ChannelOption.AUTO_READ, false);
        ChannelFuture server3Future = server3Bootstrap.connect(remoteHost, remotePort);
        server3OutboundChannel = server3Future.channel();


        // Start the connection attempt to SERVER 2
        Bootstrap server2Bootstrap = new Bootstrap();
        server2Bootstrap.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                .handler(new HexDumpProxyBackendHandler(inboundChannel))
                .option(ChannelOption.AUTO_READ, false);
        ChannelFuture server2Future = server2Bootstrap.connect(remoteHost, remotePort);
        server2OutboundChannel = server2Future.channel();
        server2Future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });

        // Here we are going to add channels to channel group to save bytebuf work
        channels.add(server2OutboundChannel);
        channels.add(server3OutboundChannel);
    }

    // You can keep this the same below or use the commented out section
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        // You need to reference count the message +1
        msg.retain();
        if (server2OutboundChannel.isActive()) {
            server2OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
        if (server3OutboundChannel.isActive()) {
            server3OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }


        // Optional to the above code instead channel writing automatically cares for reference counting for you
//        channels.writeAndFlush(msg).addListeners(new ChannelFutureListener() {
//
//            @Override
//            public void operationComplete(ChannelFuture future) throws Exception {
//                if (future.isSuccess()) {
//                    // was able to flush out data, start to read the next chunk
//                    ctx.channel().read();
//                } else {
//                    future.channel().close();
//                }
//            }
//        });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (server2OutboundChannel != null) {
            closeOnFlush(server2OutboundChannel);
        }
        if (server3OutboundChannel != null) {
            closeOnFlush(server3OutboundChannel);
        }


        // Optionally can do this
//        channels.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

丢弃处理程序

这可以作为处理程序添加到服务器 3 中,以丢弃服务器 3 写入代理的任何内容。默认情况下,SimpleInboundHandlers 将在通过递减引用计数处理消息后丢弃消息。

Discard Handler Code

关于java - Netty 代理复制流量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42660282/

相关文章:

java - java中如何使用unsigned int/char值

c++ - 如何将 C++ COM 连接到 WCF 服务

python - 有没有一种简单的方法可以区分运行 Fedora 12 的两个系统之间的网络设置差异?

java - 有没有办法从 ratpack ctx.next() 获得 Promise

java - "lexically-ordered"base64 的定义是什么?为什么 RFCC-1940 显然是规范引用?

java - IntelliJ Android - 调试前删除应用程序

java - 如何通过代码关闭java Applet

java - Netty 的整数编码器解码器

java - 推断类型不符合不相关变量的等式约束错误

python-3.x - 如何为Anaconda创建.condarc文件?