java - Spring Boot ThreadPoolTask​​Executor内存泄漏

标签 java spring multithreading threadpool

我有在 Wildfly 18.0.1 上运行的 Spring Boot 应用程序。该应用程序的主要目的是:每 5 分钟运行一些作业。所以我做:

TaskScheduler:初始化调度程序

@Autowired
ThreadPoolTaskScheduler taskScheduler;
taskScheduler.scheduleWithFixedDelay(new ScheduledVehicleDataUpdate(), 300000);

ScheduledVehicleDataUpdate:运行更新程序的调度程序

public class ScheduledVehicleDataUpdate implements Runnable {
    @Autowired
    TaskExecutor taskExecutor;

    @Override
    public void run() {
        try {
            CountDownLatch countDownLatch;
            List<VehicleEntity> vehicleList = VehicleService.getInstance().getList();
            if (vehicleList.size() > 0) {
                countDownLatch = new CountDownLatch(vehiclesList.size());
                vehicleList.forEach(vehicle -> taskExecutor.execute(new VehicleDataUpdater(vehicle, countDownLatch)));
                countDownLatch.await();
            }
        }
        catch (InterruptedException | RuntimeException e) {
            System.out.println(e.getMessage())
        }
    }
}

任务执行器:

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(23);
    executor.setMaxPoolSize(23);
    executor.setQueueCapacity(5000);
    executor.setThreadNamePrefix("VehicleService_updater_thread");
    executor.initialize();
    return executor;
}

VehicleDataUpdater:主更新程序类

public class VehicleDataUpdater implements Runnable {
    private final VehicleEntity vehicle;
    private final CountDownLatch countDownLatch;

    public VehicleDataUpdater(VehicleEntity vehicle, CountDownLatch countDownLatch) {
        this.vehicle = vehicle;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {    
        try {
            this.updateVehicleData();
        }
        catch (Exception e) {
            System.out.println(e.getMessage());
        }
        finally {
            countDownLatch.countDown();
        }
    }

    public void updateVehicleData() {
        // DO UPDATE ACTIONS;
    }
}

问题是完成ScheduledVehicleDataUpdate后内存不会被清除。它看起来像这样: enter image description here

每走一步,内存都在增长、增长、增长,在不可预知的时刻,所有的内存都会被释放。以及第一次迭代中的对象和最后一次迭代中的对象。在最糟糕的情况下,它会占用所有可用内存 (120Gb),并且 Wildfly 会崩溃。

我有大约 3200 条 VehicleEntity 记录(假设恰好有 3200 条)。所以我寻找了 VehicleDataUpdater - 内存中有多少对象。第一次迭代后(当我只启动应用程序时),它小于 3200 但不为零 - 可能约为 3000-3100。每一步都会增长,但不完全是在 3200 条记录上。这意味着某些对象会从内存中清除,但大多数对象仍保留在那里。

下一步:迭代的正常持续时间约为 30 秒 - 1 分钟。当内存没有清理干净并继续增长时,每次迭代的时间就会越来越长:我看到的最长的是 30 分钟。池中的线程大多处于“监视”状态,即有一些锁等待释放。可能是之前迭代中未释放的锁 - 再次提问 - 为什么在上一步中没有释放所有内存?

如果我在一个线程中执行更新(没有taskExecutor,只需vehicleList.foreach(vehicle -> VehicleDataUpdater(vehicle));),那么我没有看到任何内存增长。更新后,每辆车的内存都会被清除。

我没有发现 ThreadPoolTask​​Executor 或 ThreadPoolTask​​Scheduler 存在任何内存泄漏问题,因此我不知道如何修复它。

完成调度程序任务后不清除内存的可能方法是什么?完成后如何查看谁锁定了对象?我正在使用 VisualVM 2.0.1,但没有发现这种可能性。

编辑1:

车辆服务:

public class VehicleService {
    private static VehicleService instance = null;
    private VehicleDao dao;

    public static VehicleService getInstance(){
        if (instance == null) {
            instance = new VehicleService();
        }
        return instance;
    }

    private VehicleService(){}

    public void setDao(VehicleDao vehicleDao) { this.dao = vehicleDao; }

    public List<VehicleEntity> list() {
        return new ArrayList<>(this.dao.list(LocalDateTime.now()));
    }
}

VehicleDao:

@Repository
public class VehicleDao {
    @PersistenceContext(unitName = "entityManager")
    private EntityManager entityManager;

    @Transactional("transactionManager")
    public List<VehicleRegisterEntity> list(LocalDateTime dtPeriod) {
        return this.entityManager.createQuery("SOME_QUERY", VehicleEntity.class).getResultList();
    }
}

初始化服务:

@Service
public class InitHibernateService {
    private final VehicleDao vehicleDao;

    @Autowired
    public InitHibernateService(VehicleDao vehicleDao){
        this.vehicleDao = vehicleDao;
    }

    @PostConstruct
    private void setDao() {
        VehicleService.getInstance().setDao(this.vehicleDao);
    }
}

实体管理器:

@Bean(name = "entityManager")
@DependsOn("dataSource")
public LocalContainerEntityManagerFactoryBean entityManagerFactory() throws NamingException {
    LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
    em.setPersistenceProviderClass(HibernatePersistenceProvider.class);
    em.setDataSource(dataSource());
    em.setPackagesToScan("MY_PACKAGE");
    em.setJpaVendorAdapter(vendorAdapter());
    em.setJpaProperties(hibernateProperties());
    em.setPersistenceUnitName("customEntityManager");
    em.setJpaDialect(new CustomHibernateJpaDialect());
    return em;
}

最佳答案

看看您想要实现的目标基本上是使用 JPA 时的最佳批处理。但是,您尝试使用规范(多线程)而不是解决实际问题。为了获得更好的概述,我强烈建议阅读[这篇博文][1]。

  1. 使用 block 处理并在 x 条记录后刷新实体管理器,然后清除。这可以防止您在一级缓存中进行大量脏检查
  2. 在 hibernate 上启用 Batch 语句以及排序插入和更新

首先从属性开始,确保您的 hibernateProperties 包含以下内容

hibernate.jdbc.batch_size=25
hibernate.order_inserts=true
hibernate.order_updates=true

然后重写您的 ScheduledVehicleDataUpdate 以利用这一点并定期刷新/清除实体管理器。

@Component
public class ScheduledVehicleDataUpdate {
    @PersistenceContext
    private EntityManager em;

    @Scheduled(fixedDelayString="${your-delay-property-here}")
    @Transactional
    public void run() {
        try {
            List<VehicleEntity> vehicleList = getList();
            for (int i = 0 ; i < vehicleList.size() ; i++) {
              updateVehicle(vehicleList.get(i));
              if ( (i % 25) == 0) {
                em.flush();
                em.clear();
              }
            }
        }
    }

    private void updateVehicle(Vehicle vehicle) {
       // Your updates here
    }

    private List<VehicleEntity> getList() {
        return this.entityManager.createQuery("SOME_QUERY", VehicleEntity.class).getResultList();
    }
}

现在,您还可以通过使其变得更加懒惰(即仅在需要时检索数据)来减少 getList 的内存消耗。您可以通过进入 hibernate 并使用 stream 方法(从 Hibernate 5.2 开始)来完成此操作,或者在使用旧版本时做更多工作并使用 ScrollableResult (请参阅 Is there are way to scroll results with JPA/hibernate? )。如果您已经使用 JPA 2.2(即 Hibernate 5.3),则可以直接使用 getResultStream

private Stream<VehicleEntity> getList() {
  Query q = this.entityManager.createQuery("SOME_QUERY", VehicleEntity.class);
  org.hibernate.query.Query hq = q.unwrap(org.hibernate.query.Query.class);
  return hq.stream();
}

或使用 JPA 2.2

private Stream<VehicleEntity> getList() {
  Query q = this.entityManager.createQuery("SOME_QUERY", VehicleEntity.class);
  return q.getResultStream();
}

在您的代码中,您需要更改 for 循环以使用流,并自己保留一个计数器并仍然定期刷新。使用流不太可能提高性能(甚至可能会降低性能),但会比一次检索所有元素时使用更少的内存。因为内存中的对象数量与您用于批量大小的对象数量相同!

@Scheduled(fixedDelayString="${your-delay-property-here}")
    @Transactional
    public void run() {
        try {
            Stream<VehicleEntity> vehicles = getList();
            LongAdder counter = new LongAdder();
            vehicles.forEach(it -> {
              counter.increment();
              updateVehicle(it);
              if ( (counter.longValue() % 25) == 0) {
                em.flush();
                em.clear();
              }
            });
            }
        }
    }

像这样的事情应该可以解决问题。

注意:我一边写一边输入代码,由于缺少一些括号、导入等,这可能无法编译。

关于java - Spring Boot ThreadPoolTask​​Executor内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61376382/

相关文章:

java - 从递归删除Java中获取对象

java - 通过操作事件更改 JPanel 组件的属性

java - Android 切换按钮只能工作一次。我怎样才能让它每次都能工作?

spring - <上下文:property-placeholder> properties not accessible to the child (web) context

java - spring-mvc中抽象类的数据绑定(bind)

java - 用于 Elasticsearch 的RestClient

java - 我可以相信操作系统调度线程 "optimal"(并行化)

java - 在 updateProgress 期间将文本附加到 JavaFX TextArea

c++ - 我可以将shared_ptr作为临时变量传递给线程吗?

java - 不活动 transaciotn : javax. persistence.TransactionRequiredException:执行更新/删除查询