Java:同时写入和读取文件

标签 java

这实际上是一个设计问题。我不确定写入和读取文件是否是这里的理想解决方案。尽管如此,我将在下面概述我正在尝试做的事情: 我有以下静态方法,一旦调用 objreqStreamingData 方法,它就会开始以 150 毫秒的速率不断从客户端服务器检索数据。

    public static void streamingDataOperations(ClientSocket cs) throws InterruptedException, IOException{
        // call - retrieve streaming data constantly from client server, 
        // and write a line in the csv file at a rate of 150 milliseconds
        // using bufferedWriter and printWriter (print method).
        // Note that the flush method of bufferedWriter is never called,
        // I would assume the data is in fact being written in buffered memory
        // not the actual file. 
       cs.reqStreamingData(output_file); // <- this method comes from client's API.

       // I would like to another thread (aka data processing thread) which repeats itself every 15 minutes.
       // I am aware I can do that by creating a class that extends TimeTask and fix a schedule
       // Now when this thread runs, there are things I want to do. 
       // 1. flush last 15 minutes of data to the output_file (Note no synchronized statement method or statements are used here, hence no object is being locked.)
       // 2. process the data in R
       // 3. wait for the output in R to come back 
       // 4. clear file contents, so that it always store data that only occurs in the last 15 minutes
    }

现在,我不太熟悉多线程。我担心的是

  1. 请求数据线程和数据处理线程同时读取和写入文件,但速度不同,我是 不确定数据处理线程是否会延迟请求数据线程 由于数据处理比请求数据线程需要执行更多计算繁重的任务,因此数量很大。但考虑到它们是两个独立的线程,这里会发生任何错误或异常吗?
  2. 我不太支持同时写入和读取同一个文件的想法,但因为我必须使用 R 来实时处理和存储 R 的 dataframe 中的数据,所以我真的想不出其他方法接近这个。还有更好的选择吗?
  3. 有更好的设计来解决这个问题吗?

我知道这是一个很长的问题。如果您需要更多信息,请告诉我。

最佳答案

这些行(CSV 或任何其他文本)可以写入临时文件。当处理准备好开始时,唯一需要的同步发生在临时文件被新文件替换时。这保证了生产者永远不会同时写入消费者正在处理的文件。

完成后,生产者将继续向新文件添加行。使用者刷新并关闭旧文件,然后按照 R 应用程序的预期将其移动到文件。

为了进一步阐明该方法,下面是一个示例实现:

public static void main(String[] args) throws IOException {
    // in this sample these dirs are supposed to exist
    final String workingDirectory = "./data/tmp";
    final String outputDirectory = "./data/csv";

    final String outputFilename = "r.out";
    final int addIntervalSeconds = 1;
    final int drainIntervalSeconds = 5;

    final FileBasedTextBatch batch = new FileBasedTextBatch(Paths.get(workingDirectory));
    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    final ScheduledFuture<?> producer = executor.scheduleAtFixedRate(
        () -> batch.add(
            // adding formatted date/time to imitate another CSV line
            LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)
        ),
        0, addIntervalSeconds, TimeUnit.SECONDS);

    final ScheduledFuture<?> consumer = executor.scheduleAtFixedRate(
        () -> batch.drainTo(Paths.get(outputDirectory, outputFilename)),
        0, drainIntervalSeconds, TimeUnit.SECONDS);

    try {
        // awaiting some limited time for demonstration 
        producer.get(30, TimeUnit.SECONDS);
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    catch (ExecutionException e) {
        System.err.println("Producer failed: " + e);
    }
    catch (TimeoutException e) {
        System.out.println("Finishing producer/consumer...");
        producer.cancel(true);
        consumer.cancel(true);
    }
    executor.shutdown();
}

static class FileBasedTextBatch {
    private final Object lock = new Object();
    private final Path workingDir;
    private Output output;

    public FileBasedTextBatch(Path workingDir) throws IOException {
        this.workingDir = workingDir;
        output = new Output(this.workingDir);
    }

    /**
     * Adds another line of text to the batch.
     */
    public void add(String textLine) {
        synchronized (lock) {
            output.writer.println(textLine);
        }
    }

    /**
     * Moves currently collected batch to the file at the specified path.
     * The file will be overwritten if exists.
     */
    public void drainTo(Path targetPath) {
        try {
            final long startNanos = System.nanoTime();
            final Output output = getAndSwapOutput();
            final long elapsedMillis =
                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            System.out.printf("Replaced the output in %d millis%n", elapsedMillis);
            output.close();
            Files.move(
                output.file,
                targetPath,
                StandardCopyOption.ATOMIC_MOVE,
                StandardCopyOption.REPLACE_EXISTING
            );
        }
        catch (IOException e) {
            System.err.println("Failed to drain: " + e);
            throw new IllegalStateException(e);
        }
    }

    /**
     * Replaces the current output with the new one, returning the old one.
     * The method is supposed to execute very quickly to avoid delaying the producer thread.
     */
    private Output getAndSwapOutput() throws IOException {
        synchronized (lock) {
            final Output prev = this.output;
            this.output = new Output(this.workingDir);
            return prev;
        }
    }
}

static class Output {
    final Path file;
    final PrintWriter writer;

    Output(Path workingDir) throws IOException {
        // performs very well on local filesystems when working directory is empty;
        // if too slow, maybe replaced with UUID based name generation
        this.file = Files.createTempFile(workingDir, "csv", ".tmp");
        this.writer = new PrintWriter(Files.newBufferedWriter(this.file));
    }

    void close() {
        if (this.writer != null)
            this.writer.flush();
            this.writer.close();
    }
}

关于Java:同时写入和读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54064129/

相关文章:

java - MySQL 登录框架中的空指针异常

java - 启用了 proguard 的 Ormlite 找不到 id 字段

java - Java中如何知道Observer类发送的泛型对象?

java - 如何从对象的方法外部访问对象

java - 在 Eclipse 中创建 ecj.jar 文件

java - GregorianCalendar 显示奇数​​值?

java - 用另一个最终静态变量初始化最终静态变量的内存消耗

java - 我需要仔细检查 UT 中的流量吗?

java - 如何按值复制(深度复制)类型为 List<List<Integer>> 的对象?

java - 导入 facebook SDK 时出错