java - java实现管道,第二个程序没有及时输出(ping | grep)

标签 java pipe

我正在编写一个 IRC 机器人,它可以执行多个命令并将结果输出到 IRC channel 。这些命令使用在 java 中实现的管道连接。

在大多数情况下,它工作正常。但是当执行 ping 127.0.0.1 | grep -in ttlgrep程序没有及时输出:大约每38秒才输出一次,见下图。

java PipingTest screenshot 1

看起来某处正在缓冲输出。如何让第二个程序及时输出?

示例代码

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class PipingTest
{
    static ExecutorService executor = Executors.newFixedThreadPool (10);

    static class CommandRunner implements Runnable
    {
        ProcessBuilder pb;
        String program;
        Process process;
        boolean isPipingOut = false;
        CyclicBarrier barrier;
        CommandRunner previousCommand;
        CommandRunner nextCommand;
        public OutputStream out = null;
        public InputStream in = null;
        public InputStream err = null;
        public OutputStream nextOut = null;

        public CommandRunner (ProcessBuilder pb, boolean isPipingOut, CyclicBarrier barrier, CommandRunner previousCommand, CommandRunner nextCommand)
        {
            this.pb = pb;
            program = pb.command().get(0);
            this.isPipingOut = isPipingOut;
            this.barrier = barrier;
            this.previousCommand = previousCommand;
            if (previousCommand!=null)
                previousCommand.nextCommand = this;
            this.nextCommand = nextCommand;
            if (nextCommand!=null)
                nextCommand.previousCommand = this;
        }
        @Override
        public void run ()
        {
            System.out.println ("Command [" + program + "] thread ID = " + Thread.currentThread().getId());
            try
            {
                process = pb.start ();
                out = process.getOutputStream ();
                in = process.getInputStream ();
                err = process.getErrorStream ();

                if (! isPipingOut)
                {
                    System.out.println (program + ": Synchronizing with [" + previousCommand.program + "] ...");
                    barrier.await ();
                }
                if (isPipingOut)
                {
                    System.out.println (program + ": Synchronizing with [" + nextCommand.program + "] ...");
                    barrier.await ();
                    System.out.println ("ok, synchronized, now go piping");
                    executor.execute (new Pipe(in, nextCommand.out));
                }

                BufferedReader br = null;
                String line;
                if (! isPipingOut)
                {
                    System.out.println (program + ": Consuming the stdout...");
                    br = new BufferedReader (new InputStreamReader(in));
                    while ((line = br.readLine()) != null)
                    {
                        System.out.println (line);
                    }
                    System.out.println (program + ": stdout consumed");
                }
                System.out.println (program + ": Consuming the stderr...");
                br = new BufferedReader (new InputStreamReader(err));
                while ((line = br.readLine()) != null)
                {
                    System.out.println (line);
                }
                System.out.println (program + ": stderr consumed");

                int rc = process.waitFor ();
                System.out.println (program + ": exited value " + rc);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }

    static class Pipe implements Runnable
    {
        public OutputStream out = null;
        public InputStream in = null;
        public Pipe (InputStream in, OutputStream out)
        {
            this.in = in;
            this.out = out;
        }
        @Override
        public void run ()
        {
            System.out.println ("Pipe thread ID = " + Thread.currentThread().getId());
            System.out.println ("Piping...");
            long nTotal = 0;
            int nRead = 0;
            try
            {
                byte[] small_buffer = new byte[32];
                while (-1 != (nRead = in.read(small_buffer)))
                {
                    out.write (small_buffer, 0, nRead);
                    nTotal += nRead;
                    out.flush ();   // let next command get piped data as soon as possible, does it work?
                    System.out.println (new java.sql.Time(System.currentTimeMillis()) + "    piped " + nRead + " bytes, total=" + nTotal);
                }
                System.out.println ("Total piped " + nTotal + " bytes");
                in.close ();
                out.flush ();
                out.close ();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    static class WatchDog implements Runnable
    {
        int timeout = 0;
        List<CommandRunner> commands;
        public WatchDog (int timeout, List<CommandRunner> commands)
        {
            this.timeout = timeout;
            this.commands = commands;
        }
        @Override
        public void run ()
        {
            System.out.println ("WatchDog thread ID = " + Thread.currentThread().getId());
            try
            {
                TimeUnit.SECONDS.sleep (timeout);
                System.out.println ("WatchDog timeout, killing commands...");
                for (CommandRunner command : commands)
                {
                    System.out.println ("Killing command " + command.pb.command().get(0));
                    command.process.destroy ();
                }
                executor.shutdown ();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }

    }

    public static void main (String[] args)
    {
        ProcessBuilder pbCmd1, pbCmd2;
        pbCmd1 = new ProcessBuilder ("ping", "127.0.0.1");
        //pbCmd1 = new ProcessBuilder ("yes", "ttl=123");
        pbCmd2 = new ProcessBuilder ("grep", "-in", "--color=always", "ttl");
        //pbCmd1 = new ProcessBuilder ("cat", "/etc/passwd");
        //pbCmd2 = new ProcessBuilder ("grep", "-in", "--color=always", "root");
        CyclicBarrier barrier = new CyclicBarrier(2);

        List<CommandRunner> commands = new ArrayList<CommandRunner> ();
        CommandRunner cmd1 = new CommandRunner (pbCmd1, true, barrier, null, null);
        CommandRunner cmd2 = new CommandRunner (pbCmd2, false, barrier, cmd1, null);

        commands.add (cmd1);
        commands.add (cmd2);
        WatchDog watchdog = new WatchDog (90, commands);

        executor.execute (cmd1);
        executor.execute (cmd2);
        executor.execute (watchdog);
    }
}

最佳答案

grep 并没有在每一行都刷新它的输出缓冲区。如果你运行类似的东西,你会看到类似的东西:

ping 127.0.0.1 | grep -in ttl | cat

您可以通过使用 --line-buffered 选项运行 grep 来解决这个问题:

ping 127.0.0.1 | grep --line-buffered -in ttl

或使用 stdbuf 的更通用的解决方案:

ping 127.0.0.1 | stdbuf -o0 grep -in ttl

另请参阅:How to 'grep' a continuous stream?

关于java - java实现管道,第二个程序没有及时输出(ping | grep),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20097359/

相关文章:

c++ - 将多个文件 (gz) 通过管道传输到 C 程序中

java - Google Maps Android API v2 ClassNotFound 运行时错误

java - JTextField 在给出输入之前被读取

java - 接口(interface)如何返回其自身的严格实现

java - 使用 JFreeChart 在 Netbeans 中编译错误

c - UNIX 中的管道不应该是单向的吗?

linux - 将数据管道传输到需要 TTY(终端)的 Linux 程序

java - 如何避免重复的 if 语句

python - 为什么 shell 输出会转到 stderr?

c - 父子进程之间的管道问题