我正在探索一种将流量从网络代理复制到两台服务器的方法。 即代替一般实现:
Server1 -> Proxy -> Server 2
我想执行以下操作:
Server 1 -> Proxy -> Server 2 and Server 3
Server3->Proxy is dropped
因此,每条消息都会发送到服务器 2 和服务器 3。
我只有一个限制,即代理和服务器 2 之间的通信不应因为服务器 3 而被阻止(如果服务器 3 速度较慢等)。
我从以下代码开始:https://github.com/dawnbreaks/TcpProxy
不幸的是,我对 netty 不太熟悉,但其实现对于我的目的来说似乎是非常理想的。我想了解一下:
- 如何为服务器 3 创建新 channel
- 要重写哪个 API 才能与服务器 3 进行通信
- 如何从服务器 3 读取和删除消息
最佳答案
在 IRC#netty 中看到了您的聊天。
这里有几件事。您的代理需要有一个与服务器 1 连接的服务器端。然后服务器 2 和服务器 3 需要排除来自代理的连接,或者您可以使用 UDP(视情况而定)从代理接收数据。
Netty 有一个代理服务器的示例。这适用于您的情况,并且对于第三部分来说非常容易。简而言之,您将使用现有示例并打开一个到服务器 3 的新连接。现在您可以做的是从代理获取两个 channel (到服务器 2 和 3 的客户端连接)将它们放入一个 channel 组中并写入一个是时候使用两台服务器了!我编辑的示例代码将...允许通过代理从服务器 1 到服务器 2 相互通信,并允许相互通话,而服务器 3 只能接收数据,但如果服务器 3 回复代理,则代理不会执行任何操作。您可能想要添加一个处理程序来释放缓冲区或处理不应该从服务器 3 写回的数据。另外,从这里开始,这应该可以帮助您入门,但请查看 netty 文档、api、示例和 ppts,它们非常有帮助!
我将附上一些修改后的代码向您展示,这里是示例的链接。
因此,对于示例,您将编辑 HexDumpProxyFrontendHandler.class,并为服务器 3 的新客户端添加第二个 Bootstrap。
当前代码
41 @Override
42 public void channelActive(ChannelHandlerContext ctx) {
43 final Channel inboundChannel = ctx.channel();
44
45 // Start the connection attempt.
46 Bootstrap b = new Bootstrap();
47 b.group(inboundChannel.eventLoop())
48 .channel(ctx.channel().getClass())
49 .handler(new HexDumpProxyBackendHandler(inboundChannel))
50 .option(ChannelOption.AUTO_READ, false);
51 ChannelFuture f = b.connect(remoteHost, remotePort);
52 outboundChannel = f.channel();
53 f.addListener(new ChannelFutureListener() {
54 @Override
55 public void operationComplete(ChannelFuture future) {
56 if (future.isSuccess()) {
57 // connection complete start to read first data
58 inboundChannel.read();
59 } else {
60 // Close the connection if the connection attempt has failed.
61 inboundChannel.close();
62 }
63 }
64 });
65 }
编辑代码
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
// As we use inboundChannel.eventLoop() when buildling the Bootstrap this does not need to be volatile as
// the server2OutboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
private Channel server2OutboundChannel;
private Channel server3OutboundChannel;
// TODO You should change this to your own executor
private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt to SERVER 3
Bootstrap server3Bootstrap = new Bootstrap();
server3Bootstrap.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
// You are only writing traffic to server 3 so you do not need to have a handler for the inbound traffic
.handler(new DiscardServerHandler()) // EDIT
.option(ChannelOption.AUTO_READ, false);
ChannelFuture server3Future = server3Bootstrap.connect(remoteHost, remotePort);
server3OutboundChannel = server3Future.channel();
// Start the connection attempt to SERVER 2
Bootstrap server2Bootstrap = new Bootstrap();
server2Bootstrap.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture server2Future = server2Bootstrap.connect(remoteHost, remotePort);
server2OutboundChannel = server2Future.channel();
server2Future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
// Here we are going to add channels to channel group to save bytebuf work
channels.add(server2OutboundChannel);
channels.add(server3OutboundChannel);
}
// You can keep this the same below or use the commented out section
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
// You need to reference count the message +1
msg.retain();
if (server2OutboundChannel.isActive()) {
server2OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
if (server3OutboundChannel.isActive()) {
server3OutboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
// Optional to the above code instead channel writing automatically cares for reference counting for you
// channels.writeAndFlush(msg).addListeners(new ChannelFutureListener() {
//
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// if (future.isSuccess()) {
// // was able to flush out data, start to read the next chunk
// ctx.channel().read();
// } else {
// future.channel().close();
// }
// }
// });
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (server2OutboundChannel != null) {
closeOnFlush(server2OutboundChannel);
}
if (server3OutboundChannel != null) {
closeOnFlush(server3OutboundChannel);
}
// Optionally can do this
// channels.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
丢弃处理程序
这可以作为处理程序添加到服务器 3 中,以丢弃服务器 3 写入代理的任何内容。默认情况下,SimpleInboundHandlers 将在通过递减引用计数处理消息后丢弃消息。
关于java - Netty 代理复制流量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42660282/