java - 我可以使 FileInputStream.read 阻塞,直到同一文件上的 FileOutputStream 关​​闭吗?

标签 java concurrency eof fileinputstream fileoutputstream

在我的应用程序中,我正在接收要存储在文件中的数据,并用它进行一些计算。接收和计算都可能持续很长时间,所以我想异步进行。 下面的 list 显示了我的基本设置:thread1 生成一些数据并将它们存储在文件中。 thread2 读取文件并处理数据。

    Thread thread1 = new Thread( () -> {
      try {
        BufferedOutputStream out = new BufferedOutputStream( new FileOutputStream( "test" ) );
        for( int i = 0; i < 10; i++ ) {
          //producing data...
          out.write( ( "hello " + i + "\n" ).getBytes() );
          out.flush();
          //Thread.sleep( 10 );
        }
        out.close();
      } catch( Exception e ) {
        e.printStackTrace();
      }
    } );
    thread1.start();

    Thread thread2 = new Thread( () -> {
      try {
        BufferedInputStream in = new BufferedInputStream( new FileInputStream( "test" ) );
        int b = in.read();
        while( b != -1 ) {
          //do some calculation with data
          System.out.print( (char)b );
          b = in.read();
        }
        in.close();
      } catch( Exception e ) {
        e.printStackTrace();
      }
    } );
    thread2.start();

根据这个问题,我猜想在同一个文件上同时读写是可以的:FileInputStream and FileOutputStream to the same file: Is a read() guaranteed to see all write()s that "happened before"?或者我在这里遗漏了一些东西?

执行上面的列表会产生预期的输出:

hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

但是,如果读取器线程由于某种原因比写入器更快(可以通过取消注释 thread1 中的 Thread.sleep 行来模拟),读取器会读取 EOF (-1) 并在文件已完全写入。只输出一行:

hello 0

但是,编写器仍然在“测试”文件中生成整个输出。

现在我想让 in.read() 阻塞,直到 thread1 中的 FileOutputStream 关​​闭。 我认为这可以通过避免将 EOF 放在文件末尾直到 out 关闭来完成。这是真的吗?如果是,我该怎么做?或者有更好的方法吗?

最佳答案

即使接口(interface)是一个文件,读取器(消费者)也可以等待写入器(生产者)。但一般来说,使用队列并遵循生产者/消费者模式会更好。

无论如何,在这种情况下,一个粗略的“等待更多输入”过程只涉及两个原子值:

  • 用于跟踪写入的字节数 (AtomicInteger)
  • 一个表示没有更多可用字节 (AtomicBoolean)

原子变量可以在线程之间共享:两个线程将始终看到原子值的最新值。 然后,编写器可以通过 AtomicInteger 更新写入的字节数,然后读取器可以决定等待更多输入。 写入器还可以通过 AtomicBoolean 指示是否不再写入任何字节,并且读取器可以使用该信息读取到文件末尾。

要记住的另一件事是启动线程不受您的控制:您的操作系统将确定线程何时实际开始运行。 为了给线程一个合理的机会同时运行,请使用“startLatch”,如下面的代码所示。

下面的演示代码是可运行的,并且应该很好地说明如何使读取器线程等待来自写入器线程的更多输入。


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class ReadWhileWrite {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            CountDownLatch startLatch = new CountDownLatch(2);
            Path testFile = Paths.get("test-read-while-write.txt");
            testFile.toFile().delete();
            int fakeSlowWriteMs = 100; // waiting time in milliseconds between writes. 

            CountDownLatch testFileExists = new CountDownLatch(1);
            AtomicInteger bytesWritten = new AtomicInteger();
            AtomicBoolean writeFinished = new AtomicBoolean();

            // Writer
            executor.execute(() -> {
                try {
                    // Make sure reader and writer start at the same time
                    startLatch.countDown();
                    if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Bogus reader start.");
                    }
                    try (OutputStream out = Files.newOutputStream(testFile)) {
                        testFileExists.countDown();
                        int maxLoops = 10;
                        IntStream.range(0, maxLoops).forEach(i -> {
                            byte[] msg = ("hello " + i + "\n").getBytes(StandardCharsets.UTF_8);
                            try {
                                out.write(msg);
                                out.flush();
                                bytesWritten.addAndGet(msg.length);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            if (fakeSlowWriteMs > 0 && i < maxLoops - 1) {
                                try {
                                    Thread.sleep(fakeSlowWriteMs);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                writeFinished.set(true);
            });
            // Reader
            CountDownLatch doneLatch = new CountDownLatch(1);
            executor.execute(() -> {
                try {
                    // Make sure reader and writer start at the same time
                    startLatch.countDown();
                    if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Bogus writer start.");
                    }
                    int bytesRead = 0;
                    int bytesRequired = 1; // Number of bytes read from file in one go.
                    int maxWaitTimeMs = 1000;
                    if (!testFileExists.await(500L, TimeUnit.MILLISECONDS)) {
                        throw new RuntimeException("Writer did not open file for reading within 500 ms.");
                    }
                    try (InputStream in = Files.newInputStream(testFile)) {
                        boolean eof = false;
                        while (!eof) {
                            if (!writeFinished.get()) {
                                if (bytesWritten.get() - bytesRead < bytesRequired) {
                                    int sleepTimeTotal = 0;
                                    while (!writeFinished.get()) {
                                        Thread.sleep(1);
                                        if (bytesWritten.get() - bytesRead >= bytesRequired) {
                                            break; // break the waiting loop, read the available bytes.
                                        }
                                        sleepTimeTotal += 1;
                                        if (sleepTimeTotal >= maxWaitTimeMs) {
                                            throw new RuntimeException("No bytes available to read within waiting time.");
                                        }
                                    }
                                }
                            }
                            int b = in.read();
                            bytesRead += 1;
                            if (b < 0) {
                                eof = true;
                            } else {
                                System.out.print( (char) b);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                doneLatch.countDown();
            });
            if (!doneLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Reader and writer did not finish within 3 seconds.");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        executor.shutdownNow();
        System.out.println("\nFinished.");
    }
}

关于java - 我可以使 FileInputStream.read 阻塞,直到同一文件上的 FileOutputStream 关​​闭吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59679306/

相关文章:

c++ - 在 C++ 中检查空文件

java - 如何从 JButton 检索数据?

java - Spring测试时如何在上下文初始化之前初始化测试类?

java - Java UDP多线程通信的一个问题

java - 如何检查线程是否在同步块(synchronized block)或方法内?

java - 多 :Threading - Is this the right approach?

java - 通过 JNA 调用 native 库时的并发问题

python - 什么是检测下一次读取将在 Python 3(和 Python 2)中产生 EOF 的 Pythonic 方法

c++ - 使用 EOF 函数作为条件

javafx - : es2, sw M1 MacOS 的图形设备初始化失败