我有三个进程,名为 LWA1、LWA2 和 LWA3。每个都有一台服务器,LWA1 的端口为 55555,LWA2 的端口为 55556,LWA3 的端口为 55557。 此外,每个进程都有一个客户端,以便连接到其他进程。
每个进程都应该能够写入和读取其他进程。所以:
- LWA1 应该向 LWA2 和 LWA3 写入和读取
- LWA2 应向 LWA1 和 LWA3 写入数据或从 LWA1 和 LWA3 读取数据
- LWA3 应该向 LWA1 和 LWA2 写入数据或从 LWA1 和 LWA2 读取数据
目前,每个进程执行两次写入,但只接收一条消息。每个进程的输出如下(选项卡式打印属于客户端,未选项卡式打印属于服务器)。
LWA1:
Setting up server with port: 55555
Server configured.
Opening sockets to port 55556 and port 55557
Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted
LWA2:
Setting up server with port: 55556
Server configured.
Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}
LWA3:
Setting up server with port: 55557
Server configured.
Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}
正如您所看到的,每个客户端都向另外两个客户端写入一个 LamportRequest,但其他两个客户端只收到一条消息。为什么其他消息没有被接收?
我怀疑这可能与服务器中的 key 有关,但不知道可能是什么。另外,我也不完全理解他们。如果我错了请纠正我:
与选择器的每个连接都用不同的 (SelectableChannel) 键表示,因此服务器 LWA1 中的迭代器(例如)应该只有(因此,仅监听事件)两个键,一个用于 LWA2,另一个用于 LWA2 LWA3,对吗?我尝试在 keyAccept 方法中将整数附加到每个键来区分它们,效果很好,但是在 keyRead 方法中打印附加的整数时,它显示为 null。该方法的关键是新的吗?第三把 key 突然出现了?
额外问题:我应该在单个线程中实现这个结构。目前我使用两种,一种用于服务器,一种用于客户端。一旦它开始工作,有什么关于如何统一它们的技巧吗?
-----------------代码-----------------
服务器(为了阅读目的而简化)如下:
public TalkToBrotherSocket(int clock, int port) {
this.port = port;
this.clock = clock;
try {
setServer();
System.out.println("Server configured.\n");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
// Wait for an event one of the registered channels
selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check if they key is ready to accept a new socket connection
if (key.isAcceptable()) {
keyAccept(key);
System.out.println("Key accepted");
} else if (key.isReadable()){
System.out.println("Reading data from server");
keyRead(key);
} else if (key.isWritable()){
System.out.println("Writting data from server");
keyWrite(key); //unused at the moment
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void keyRead(SelectionKey key) throws IOException {
// Create a SocketChannel to read the request
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
buffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(buffer);
} catch (IOException e) {
System.out.println("Closing socket");
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
System.out.println("Shutting down socket");
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
System.out.println("I read: " + new String(buffer.array()).trim());
}
private void keyAccept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
//Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void setServer() throws IOException {
// Create a new selector
selector = Selector.open();
// Create a new non-blocking server socket channel
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
serverChannel.bind(new InetSocketAddress("localhost", port));
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
客户端(为了阅读目的而简化)如下:
public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
this.process = process;
this.clock = clock;
this.id = id;
try {
System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
firstBuffer = ByteBuffer.allocate(1024);
secondBuffer = ByteBuffer.allocate(1024);
sendRequests();
} catch (IOException e) {
e.printStackTrace();
}
}
private void sendRequests() {
LamportRequest lamportRequest = new LamportRequest(clock, process, id);
firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
System.out.println("\tSending lamport request: " + converted);
try {
firstClient.write(firstBuffer);
secondClient.write(secondBuffer);
firstBuffer.clear();
}
初始化如下:
System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();
new NIOClient(clock, firstPort, secondPort, id, process);
最佳答案
根据@user207421的评论,我添加了一个新的第二个缓冲区。对 sendRequests 方法的更改已编辑到原始帖子中。
关于java - NIO 套接字 - 分布式系统,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61370342/