java - 无法停止Hadoop IPC服务

标签 java hadoop mapreduce

我正在使用Hadoop IPC创建顺序号生成服务,但无法在程序退出时停止服务器。有人可以帮我吗?

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

import dk.aau.cs.cloudetl.common.CEConstants;
import dk.aau.cs.cloudetl.hadoop.fs.FSUtil;

public class SequenceServer extends Thread implements  ClientProtocol {

    Map<String, Integer> seqMap = new HashMap<String, Integer>();
    Configuration conf;
    Server server;
    Path seqFile;
    volatile private boolean running = true;        

    public SequenceServer(Configuration conf) {
        try {
            this.conf = conf;
            this.seqFile = new Path(CEConstants.META_DIR + Path.SEPARATOR
                    + "cloudETL.seq");

            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(),CEConstants.SEQ_SERVER_PORT, 5, true, conf);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            readSeqsFromHDFS();
            server.start();


            System.out.println("=============Start==============");
            while(running){
                sleep(5000);
            }
            System.out.println("=============END==============");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }



    private void readSeqsFromHDFS() {
        try {
            FileSystem fs = FileSystem.getLocal(conf);
            if (fs.exists(seqFile)) {
                SequenceFile.Reader reader = new SequenceFile.Reader(fs,
                        seqFile, conf);
                Text key = new Text();
                IntWritable value = new IntWritable();
                while (reader.next(key, value)) {
                    String name = key.toString();
                    int seq = value.get();
                    seqMap.put(name, seq);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void writeSeqsToHDFS() {
        try {
            FileSystem fs = FileSystem.getLocal(conf);
            Path tmp = new Path(seqFile.getParent() + Path.SEPARATOR
                    + "tmp.seq");
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
                    tmp, Text.class, IntWritable.class);
            for (Entry<String, Integer> entry : seqMap.entrySet()) {
                String name = entry.getKey();
                int seq = entry.getValue();
                writer.append(new Text(name), new IntWritable(seq));
            }
            writer.close();

            FSUtil.replaceFile(new File(tmp.toString()),
                    new File(seqFile.toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    synchronized public void stopServer() {
        try {
            System.out.println(server.getNumOpenConnections() );
            server.stop();

            writeSeqsToHDFS();
            running = false;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return versionID;
    }

    @Override
    synchronized public IntWritable nextSeq(Text name) {
        String seqName = name.toString();
        if (!seqMap.containsKey(seqName)) {
            seqMap.put(seqName, new Integer(CEConstants.SEQ_INCR_DELTA));
            return new IntWritable(0);
        } else {
            int ret = seqMap.get(seqName);
            seqMap.put(seqName, ret + CEConstants.SEQ_INCR_DELTA);
            return new IntWritable(ret);
        }
    }

    public static void main(String[] args) {
        SequenceServer server = new SequenceServer(new Configuration());
        server.start();

        server.stopServer();
    }
}

我还有另一个客户端程序来获取唯一编号。我不会在这里发布。

感谢您的回答。我知道你所说的问题。但是,我当前的问题是无法停止RPC服务器。由于RPC服务器以守护程序模式运行,即使我运行stop(),它仍然无法退出。您可以尝试以下方法:
import java.io.IOException;
import java.net.InetAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

public class Test {
    Server server;

    public Test() {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(),16000, 5, true, new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start(){
        server.start();
    }

    public void stop(){
        server.stop();
    }

    public static void main(String[] args) {
        Test test = new Test();
        test.start();
        test.stop();
    }
}

谢谢!但这仍然行不通。你能试试我的例子吗?您只需复制并另存为Test.java,然后运行它。您将看到它无法退出主线程。
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

interface MyProtocal extends org.apache.hadoop.ipc.VersionedProtocol {
    public static final long versionID = 1L;

    IntWritable nextSeq(Text name);
}

public class Test implements MyProtocal {
    Server server;

    public Test() {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(), 16000, 5, true,
                    new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        server.start();
    }

    public void stop() {
        server.stop();
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return versionID;
    }

    @Override
    public IntWritable nextSeq(Text name) {
        return new IntWritable(999);
    }

    static class SEQ {
        MyProtocal client;

        public SEQ() {
            InetAddress addr;
            try {
                addr = InetAddress.getLocalHost();
                client = (MyProtocal) RPC.waitForProxy(MyProtocal.class,
                        MyProtocal.versionID,
                        new InetSocketAddress(addr.getHostName(), 16000),
                        new Configuration());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void print() {
            System.out.println(client.nextSeq(new Text("aa")).get());
        }

        public void stop() {
            RPC.stopProxy(client);
        }
    }

    public static void main(String[] args) {
        Test server = new Test();
        server.start();

        SEQ seq = new SEQ();
        seq.print();
        seq.stop();
        server.stop();
    }
}

最佳答案

您的设计已损坏。
为什么需要在单独的线程中执行程序?

它已经在主线程中运行,并且RPC服务器也在单独的线程中运行。

我的建议是删除自己的线程,仅在没有while()循环的情况下调用run方法,然后停止服务器。

一般说明:实现Runnable而不是从Thread扩展

如果您需要坚持使用无用的线程,请调用主要方法server.join()并从run方法的末尾调用stopServer()。

关于java - 无法停止Hadoop IPC服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8022103/

相关文章:

arrays - 计算 Hive 数组中连续日期之间的差异

hadoop - 用于获取正在运行的应用程序使用的容器和 vcore 数量的 YARN shell 命令

Swift 4.2 改进算法中的 "O"成本

java - 尝试更改空字符串的反序列化方式时发生 ConvertException : Element annotation required for field

java - 在配置单元中创建表异常?

java - jarsigner 在哪里?

hadoop - hadoop mapreduce中的单独输出文件

java - 如何使用 Map Reduce 按最新日期记录?

java - 当字符串以 &*( 开头并以 )(* 结尾时,如何从字符串中删除一部分文本

java - 在我的 Java 应用程序中添加 Web 浏览器