java - 具有持久工作实例的线程池

标签 java multithreading ftp threadpool threadpoolexecutor

我正在尝试在线程池中排队任务,以便在工作人员空闲时立即执行,我已经找到了各种这样的示例,但在所有情况下,这些示例都已设置为每个工作人员使用一个新的工作人员实例工作,我需要坚持不懈的员工。

我正在尝试制作一个 ftp 备份工具,我可以使用它,但由于单个连接的限制,它很慢。我理想情况下想要做的是有一个连接来扫描目录并建立文件列表,然后四个工作人员来下载所述文件。

这是我的 FTP 工作器的示例:

public class Worker implements Runnable {
  protected FTPClient _ftp;

  // Connection details
  protected String _host = "";
  protected String _user = "";
  protected String _pass = "";

  // worker status
  protected boolean _working = false;

  public Worker(String host, String user, String pass) {
    this._host = host;
    this._user = user;
    this._pass = pass;
  }

   // Check if the worker is in use
  public boolean inUse() {
    return this._working;
  }

  @Override
  public void run() {
    this._ftp = new FTPClient();
    this._connect();
  }

  // Download a file from the ftp server
  public boolean download(String base, String path, String file) {
    this._working   = true;
    boolean outcome = true;

    //create directory if not exists
    File pathDir = new File(base + path);
    if (!pathDir.exists()) {
      pathDir.mkdirs();
    }

    //download file
    try {
      OutputStream output = new FileOutputStream(base + path + file);
      this._ftp.retrieveFile(file, output);
      output.close();
    } catch (Exception e) {
      outcome = false;
    } finally {
      this._working = false;
      return outcome;
    }
  }

  // Connect to the server
  protected boolean _connect() {
    try {
      this._ftp.connect(this._host);
      this._ftp.login(this._user, this._pass);
    } catch (Exception e) {
      return false;
    }
    return this._ftp.isConnected();
  }

  // Disconnect from the server
  protected void _disconnect() {
    try {
      this._ftp.disconnect();
    } catch (Exception e) { /* do nothing */ }
  }
}

我希望能够在工作线程可用时为队列中的每个任务调用 Worker.download(...) ,而不必为每次下载创建与 ftp 服务器的新连接.

任何帮助将不胜感激,因为我以前从未使用过线程,现在我正在兜圈子。

最佳答案

the examples have been setup to use a new Worker instance for each job, i want persistent workers.

这是一个常见问题,有多种不同的解决方案。您想要的是每个线程一些上下文,而不是每个提交给ExecutorServiceRunnableCallable .

一种选择是使用 ThreadLocal 来创建您的 ftp 实例。这不是最佳选择,因为当线程终止时没有简单的方法来关闭 ftp 连接。然后,您可以通过限制线程池中运行的线程数来限制连接数。

我认为更好的解决方案是仅使用 ExecutorService 来 fork 工作线程。对于每个工作人员,向他们注入(inject)一个 BlockingQueue,他们都用它来出队并执行他们需要执行的任务。这与 ExecutorService 内部使用的队列是分开的。然后,您可以将任务添加到您的队列中,而不是添加到ExecutorService本身。

private static final BlockingQueue<FtpTask> taskQueue
        = new ArrayBlockingQueue<FtpTask>();

因此您的任务对象将具有以下内容:

public static class FtpTask {
     String base;
     String path;
     String file;
}

然后,Worker 类中的 run() 方法将执行以下操作:

public void run() {
    // make our permanent ftp instance
    this._ftp = new FTPClient();
    // connect it for the life of this thread
    this._connect();
    try {
        // loop getting tasks until we are interrupted
        // could also use volatile boolean !shutdown
        while (!Thread.currentThread().isInterrupted()) {
            FtpTask task = taskQueue.take();
            // if you are using a poison pill
            if (task == SHUTDOWN_TASK) {
                break;
            }
            // do the download here
            download(task.base, task.path, task.file);
        }
    } finally {
        this._disconnect();
    }
}

同样,您可以通过限制线程池中运行的线程数来限制连接数。

What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.

我会有一个 Executors.newFixedThreadPool(5); 并添加一个执行扫描/构建的线程和 4 个执行下载的工作线程。扫描线程将放入 BlockingQueue 中,而工作线程则从同一队列中获取。

关于java - 具有持久工作实例的线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19430834/

相关文章:

python - 检查 python 中正在运行的子进程的标准输出

C++ Ubuntu select() 如果串行接口(interface)有数据异步读取

iphone - 如果服务器上更新,则通过 FTP 下载到 iphone

java - 有条件地从 build/apk 中排除 java 文件

java - getContentLength 在 Android 上不适用于 FTP url

java - 使用系统管理员定义的属性配置基于 Spring 的 servlet

python - 处理另一个线程中的异常

c# - 使用 WinSCP .NET 程序集通过 FTPS(安全)发送文件

linux - 每天定时将文件从服务器A移动到服务器B的Shell脚本

Java : Quartz scheduler - is there a way I can get next five runs of a scheduled job