JavaFX 版本的 ExecutorService

标签 java multithreading javafx

我制作了一个 Java 实用程序,可以在 x 个精美的 zip 文件 (twx) 中搜索 XML。

最初这是一个命令行实用程序,没有线程。

当我将它移动到使用 JavaFX 时,我遇到了卡住问题,然后将所有搜索移动到任务对象中进行修复。

我需要一些方法来跟踪进度,因此我实现了 Progress 属性和 ProgressBar 来进行跟踪。

效果很好,但既然我已经是多线程的,为什么不为每个 Zip Search 创建一个线程。不幸的是,这并没有起到很好的作用。

为了保持跟踪,我创建了一个任务数组,然后创建了一个处理它们的主任务。我使用 progress 和 total 属性来处理所有更新。

这是代码

public class TextSearch {
    final private SimpleDoubleProperty progress = new SimpleDoubleProperty();
    final private SimpleDoubleProperty total = new SimpleDoubleProperty();

/**
 * 
 * Kicks off a search. Creates a Task/Thread for each twx search.
 * @return A task object that maintains all of the twx searches.
 * 
 * @throws ZipException If the zip file is unreadable.
 * @throws IOException If the file is unreadable.
 * @throws JDOMException If the xml in the files are unreadable.
 * @throws InvalidTWXFile If the twx is corrupt.
 */
public Task<?> executeSearch() throws ZipException, IOException, JDOMException, InvalidTWXFile {
    //Loop through all registered twx files.
    Iterator<TWExport> rit = registered.iterator();
    Integer t = 0;
    //Create a task for each search
    final ArrayList<Task<?>> tasks = new ArrayList<Task<?>>();
    while(rit.hasNext())
    {
        final TWExport twx = rit.next();
        //Only run search if user selects to search it.
        if(twx.getSearchEnabled())
        {
            Task<Void> task = new Task<Void>() {
                @Override public Void call() {
                    informUser("Searching " + twx);
                    SearchResults proj = new SearchResults(twx);
                    searchResult.add(proj);
                    searchTwx(proj,twx);
                    twx.setResultCount(proj.getTotalCount());
                    informUser("Finished Searching " + twx);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        if (isCancelled()) {
                            updateMessage("Cancelled");
                        }
                    }
                    return null;
                }
            };
            tasks.add(task);
            t += twx.getObjects().size();
        } else
        {
            informUser("Skipping " + twx);
        }
    }
    total.setValue(t);


    //Create the main thread that will hold all individual searches.
    Task<Void> main = new Task<Void>() {
        @Override
        protected Void call() throws Exception {
            startTime = new Date();
            Iterator<Task<?>> it = tasks.iterator();
            while(it.hasNext())
            {
                Thread t = new Thread(it.next());
                t.start();
            }
            //Sometimes I get a hung thread in this loop
            while(!progress.getValue().equals(total.getValue()))
            {
                updateProgress(progress.getValue(), total.getValue());
                Thread.sleep(5000);
            }
            setEndTime(new Date());
            return null;
        }
    };

    new Thread(main).start();
    return main;
}

/**
 * Search through a twx file and add results to the project search results
 * @param proj  The parent SearchResults
 * @param twx   The TWExport to search
 */
private void searchTwx(SearchResults proj, TWExport twx) {
    Iterator<TWObject> it = twx.getObjects().iterator();
    //Iterate through the files and get the result
    while(it.hasNext())
    {
        TWObject object = it.next();
        progress.setValue(progress.getValue() + 1);
        if(searchArtifacts.matcher(object.getName()).find())
        {
            SearchResults result = object.searchContents(searchStr);
            if(result != null)
            {
                proj.add(result);
            }
        }
    }
}

使用主线程似乎非常笨拙,有时它会在一次搜索 10 多个 zip 文件时卡在那个循环中。

对于未知数量的任务,是否有更好的方法来做到这一点?是否有类似 JavaFX ExecutorService 的东西,我可以在其中添加一堆任务、启动它并监视 progressProperty?

最佳答案

不需要特定于 JavaFX 的执行程序服务:常规 java.util.concurrent.ExecutorService 工作得很好,因为 Task 的子类 future 任务

获得任务列表后,您可以根据每个任务的进度计算总体进度。例如,它可能只是所有进度的总和除以任务数。如果每个任务都有不同数量的项目要处理,您可以做一些更复杂的事情。

这是一个简单的 SSCCE:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import javafx.application.Application;
import javafx.beans.binding.DoubleBinding;
import javafx.beans.property.DoubleProperty;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.SimpleDoubleProperty;
import javafx.beans.property.SimpleIntegerProperty;
import javafx.concurrent.Task;
import javafx.concurrent.Worker;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.ProgressBar;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.scene.layout.Priority;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class MultipleTaskTest extends Application {

    private final ExecutorService exec = Executors.newFixedThreadPool(5, r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t ;
    });

    private final Random rng = new Random();


    @Override
    public void start(Stage primaryStage) {

        Label pendingTasksLabel = new Label();
        Button button = new Button("Launch tasks");
        TextArea log = new TextArea();

        DoubleProperty progress = new SimpleDoubleProperty(1);

        ProgressBar progressBar = new ProgressBar();
        progressBar.progressProperty().bind(progress);

        IntegerProperty pendingTasks = new SimpleIntegerProperty(0);
        pendingTasksLabel.textProperty().bind(pendingTasks.asString("Pending Tasks: %d"));

        button.disableProperty().bind(pendingTasks.greaterThan(0));

        button.setOnAction(e -> {
            int numTasks = rng.nextInt(5) + 4 ;

            List<Task<Void>> tasks = new ArrayList<>();
            for (int i = 0; i < numTasks; i++) {
                tasks.add(createRandomTask());
            }

            // rebind progress:
            progress.unbind();
            progress.bind( new DoubleBinding() {
                {
                    for (Task<Void> task : tasks) {
                        bind(task.progressProperty());
                    }
                }

                @Override
                public double computeValue() {
                    return tasks.stream().collect(Collectors.summingDouble(
                        task -> Math.max(task.getProgress(), 0)    
                    )) / numTasks;
                }
            });

            log.appendText("Submitting "+numTasks+" tasks\n");

            pendingTasks.set(numTasks);

            // log state of each task:
            tasks.forEach(task -> 
                task.stateProperty().addListener((obs, oldState, newState) -> {
                    log.appendText("\tTask "+newState+"\n");

                    // update pendingTasks if task moves out of running state:                  
                    if (oldState == Worker.State.RUNNING) {
                        pendingTasks.set(pendingTasks.get() - 1);
                    }
                }));

            tasks.forEach(exec::execute);
        });

        VBox root = new VBox(10, pendingTasksLabel, progressBar, log, button);
        root.setAlignment(Pos.CENTER);
        root.setPadding(new Insets(10));
        VBox.setVgrow(log, Priority.ALWAYS);

        primaryStage.setScene(new Scene(root, 400, 400));
        primaryStage.show();
    }

    @Override
    public void stop() {
        exec.shutdownNow() ;
    }

    private Task<Void> createRandomTask() {
        int numSteps = 100 + rng.nextInt(100);
        return new Task<Void>() {
            @Override
            public Void call() throws Exception {
                for (int i = 1; i <= numSteps; i++) {
                    Thread.sleep(50);
                    updateProgress(i, numSteps);
                }
                return null ;
            }
        };
    }

    public static void main(String[] args) {
        launch(args);
    }
}

关于JavaFX 版本的 ExecutorService,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33394428/

相关文章:

Java 扫描器不接受负整数值,仅适用于正值

java - 使用 JAXB 从 XML 文件中具有多个组的 XML 字符串创建对象

java - 一次仅由一个线程使用一个对象

java - Canvas 画不出流畅的线条

combobox - JavaFX 可编辑组合框 : Showing toString on item selection

java - 尝试使用 Java SAX 解析 excel xml 文件时出错

java - 通过单击复选框禁用 jtable 中的组合框

multithreading - 什么是跳线,什么时候需要?

C++ 稍后从线程函数获取返回值

JavaFX addListener 不工作