java - Java 中的动态计划并发任务执行

标签 java multithreading concurrency quartz-scheduler

我正在尝试实现一个根据某些用户输入对任务进行编程的应用程序。用户可以将多个IP与与它们关联的telnet命令(一对一关系)、执行频率和2个组(集群、对象类)。

用户应该能够在运行时添加/删除 IP、集群、命令等。他们还应该能够中断执行。

该应用程序应该能够向 IP 发送 telnet 命令,等待响应并根据频率将响应保存在数据库中。我遇到的问题是尝试使所有这些都是多线程的,因为至少有 60,000 个 IP 可以进行 telnet,并且在单个线程中执行此操作会花费太多时间。一个线程应该处理同一集群中具有相同 objectClass 的一组 IP。

我看过Quartz安排工作。使用 Quartz,我尝试创建一个动态作业,获取 IP 列表(带有命令),对其进行处理并将结果保存到数据库中。但后来我遇到了用户提供的不同计时器的问题。 Quartz 网页上的示例不完整,并且没有详细介绍。

然后我尝试用老式的方法,使用java线程,但我需要异常处理和参数传递,线程不这样做。然后我发现了 Callables 和 Executors,但我无法使用 Callables 安排任务。

现在我很困惑,我该怎么办?

最佳答案

好的,这里有一些想法。与必要的盐一起服用。

首先,创建一个您需要完成的所有工作的列表。我假设您在表中的某处有这个,您可以进行如下所示的连接:

cluster | objectClass | ip-address | command | frequency | last-run-time

这代表了您的系统需要完成的所有工作。为了便于解释,我会说频率可以采用“每天 1 次”、“每小时 1 次”、“每小时 4 次”、“每分钟”的形式。该表每个(集群、对象类、IP 地址、命令)一行。假设另一个表有运行历史记录,包含错误消息和其他内容。

现在您需要做的是阅读该表并安排工作。对于调度,请使用以下之一:

ScheduledExecutorService exec = Executors...

当您安排某些事情时,您需要告诉它运行的频率(使用我们给出的频率就足够简单了)和延迟。如果某件事每分钟运行一次,并且最后一次运行是在 4 分 30 秒前,则初始延迟为零。如果每小时运行一次,则初始延迟为(60 分钟 - 4.5 分钟 = 55.5 分钟)。

ScheduledFuture<?> handle = exec.scheduleAtFixedRate(...);

更复杂的调度类型是像 Quartz 这样的东西存在的原因,但基本上你只需要一种解决方法,给定(调度,上次运行)到下一次执行的耗时。如果您可以做到这一点,那么您可以使用schedule(...)而不是scheduleAtFixedRate(...),然后在该任务完成时安排该任务的下一次运行。

无论如何,当你安排某件事时,你会得到它的句柄

ScheduledFuture<?> handle = exec.scheduleAtFixedRate(...);

将此 handle 放在可触及的地方。为了便于讨论,我们假设它是 TaskKey 的映射。 TaskKey 是(cluster | objectClass | ip-address | command)一起作为一个对象。

Map<TaskKey,ScheduledFuture<?>> tasks = ...;

您可以使用该句柄来取消和安排新作业。

cancelForCustomer(CustomerId id) {
  List<TaskKey> keys = db.findAllTasksOwnedByCustomer(id);
  for(TaskKey key : keys) {
    ScheduledFuture<?> f = tasks.get(key);
    if(f!=null) f.cancel();
  }
}

对于参数传递,创建一个对象来代表您的工作。使用您需要的所有参数创建其中之一。

class HostCheck implements Runnable {
  private final Address host;
  private final String command;
  private final int something;
  public HostCheck(Address host, String command; int something) {
    this.host = host; this.command = command; this.something = something;
  }
  ....
}

对于异常处理,将其全部本地化到您的对象中

class HostCheck implements Runnable {
  ...
  public void run() {
    try {
      check();
      scheduleNextRun(); // optionally, if fixed-rate doesn't work
    } catch( Exception e ) {
      db.markFailure(task); // or however.
      // Point is tell somebody about the failure.
      // You can use this to decide to stop scheduling checks for the host
      // or whatever, but just record the info now and us it to influence
      // future behavior in, er, the future.
    }
  }
}

好的,到目前为止,我认为我们的状态非常好。有很多细节需要填写,但感觉是可以管理的。现在我们遇到了一些复杂性,这就是“cluster/objectClass”对的执行是串行的要求。

有几种方法可以处理这个问题。

如果唯一对的数量较少,您可以直接制作 Map<ClusterObjectClassPair,ScheduledExecutorService> ,确保创建单线程执行程序服务(例如 Executors.newSingleThreadScheduledExecutor() )。因此,您拥有的不是单个调度服务(exec,上面)。足够简单。

如果您需要控制同时尝试的工作量,那么您可以让每个 HealthCheck 在执行前获得许可。拥有一些全局许可对象

public static final Semaphore permits = java.util.concurrent.Semaphore(30);

然后

class HostCheck implements Runnable {
  ...
  public void run() {
    permits.acquire()
    try {
      check();
      scheduleNextRun();
    } catch( Exception e ) {
      // regular handling
    } finally {
      permits.release();
    }
  }
}

每个ClusterObjectClassPair只有一个线程,它序列化该工作,然后只允许限制数量 ClusterObjectClassPair您可以一次交谈。

我想这会是一个相当长的答案。祝你好运。

关于java - Java 中的动态计划并发任务执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23573312/

相关文章:

java - 如果未传递该值并使线程安全,如何在构建器模式中使用默认值?

java - 内部没有语句且以分号结尾的方法

c# - 如果我在整个运行过程中都需要一次性用品怎么办?

java - 在java中使用synchronized block 同时阻止对多个方法的访问

php - PHP/MYSQL 中的并发

data-structures - 带范围的 Golang 并发 map 访问

java - 查找什么类型的类

java - 无法在 Struts2 Liferay Portlet 中将窗口状态设置为独占

multithreading - 访问TObjectList线程的不同索引是否安全?

java - Quartz Misfire - 并发