java - 数据库的多线程问题

标签 java multithreading hibernate concurrency

我想解析一个文件并将内容传输到数据库中。为了加快一切速度,应该并行解析文件。
我有一个主线程,它正在逐行读取文件并创建 Runnable,并将其提供给 ThreadPoolExecutor。每个Runnable都有自己的Session

每一行都包含一个客户端的唯一标识符,因此可以重复。系统尝试通过标识符在数据库中查找客户端。
如果找不到需要同一客户端的线程之一,则需要创建客户端。我这里有一个“连接”点,其他线程必须等待允许创建客户端的线程。

c = (Client) s.get("Client", identfier);
if (c == null) {
    CountDownLatch lock = isClientResolutionActive(identfier);

    if (lock != null) {
        lock.await();
        LOGGER.info("Lock was released ... " + identfier);
        c = (Client) s.get("Client", identfier);
    }
}

if (c == null) {
    c = createClient(...);

    s.save(c);
    s.flush();
    removeClientResolutionActive(identfier);
}

为了同步它们,我在调用者类中创建了两个方法,一个方法专门用于检查是否已经有人在进行客户端创建并返回共享对象,另一个方法从列表中删除条目并通知所有等待线程.

我在互联网上进行了大量搜索,并试图找到我的问题或类似的问题,但没有成功。
此外,我不确定应该使用哪个并发对象。经过研究,我决定使用CountDownLatch。它用1初始化。应该只有一个线程创建它。 (也许使用 CountDownLatch 之外的其他东西会更好,但我不知道是什么)

上述方法在 map 上包含一个同步 block ,其中包含客户端的标识符和CountDownLatch实例。

private CountDownLatch isClientResolutionActive(String identfier) {
    synchronized (activeSearches) {
        if (activeSearches.containsKey(identfier)) {
            // Only create the CountDownLatch if there are multiple threads for
            // that identfier
            if (activeSearches.get(identfier) == null) {
                activeSearches.put(identfier, new CountDownLatch(1));
            }
            return activeSearches.get(identfier);
        } else {
            LOGGER.info("Locked " + identfier);
            activeSearches.put(identfier, null);
            return null;
        }
    }
}

private void removeClientResolutionActive(String identfier) {
    synchronized (activeSearches) {
        CountDownLatch cl = activeSearches.get(identfier);
        activeSearches.remove(identfier);
        if (cl != null) {
            LOGGER.info("Unlock " + identfier);
            cl.countDown();
        }
    }
}
一般来说,它工作正常,但有时我会遇到问题,当闩锁被释放(并删除)并且访问同步变量队列包含另一个线程来搜索已删除的条目(以检查是否有任何线程被已经这样做了),它尝试再次创建一个新客户端。

18:02:55,611 [pool-1-thread-2] INFO LogImporter Unlock b42fcae346fbb2b1e3c544fb816de2c5
18:02:55,611 [pool-1-thread-3] INFO LogImporter Locked b42fcae346fbb2b1e3c544fb816de2c5
18:02:55,611 [pool-1-thread-4] INFO LogImporter Lock was released ... b42fcae346fbb2b1e3c544fb816de2c5

我认为我必须改进同步,但我不知道如何改进。

一个想法是将客户端搜索移动到同步 block 中,或者在再次锁定数据库之前进行检查。
也许创建一个缓存或映射,它保存数据库中的所有已知客户端。
或者在应用程序的整个生命周期中仅使用一个 session

预先感谢您的任何建议和提示。

最佳答案

在并行线程中解析同一文件不会提高速度,只会消耗额外的资源

问题更少且更高效的 text2db 优化包括:

  • 批量读取文件(而不是一次逐行读取 1 MB,处理它,读取下一个 MB)
  • 批量插入数据库 - mysql,如下所示:

    insert into urtable 
    values
    ('val1','val2'),
    ('val1','val2'); 
    

(示例取自 http://bytes.com/topic/sql-server/answers/585793-insert-into-using-select-values-inserting-multiple-rows - 抱歉我懒得自己编一个)

  • 尝试阻止 sql 来回(意味着:如果需要从数据库中选择输出来丰富数据集,请预先读取它,而不是在浏览文件时不断地读取)

更新----

从评论中我认为在解析文件时可能需要从数据库获取数据。好吧,如果你必须做,你就必须做。但是:尽量不要这样做。

首先:读取具体数据可以看出缓存与否。从狭义的理解来看,缓存只是通过任何启发式方法将磁盘数据移动到内存(不知道发生了什么)。我个人会尽量避免这种情况,因为启发法可能会对你不利。在更广泛的理解中,缓存就是我之前描述的,加上将数据从磁盘放入内存,您可以精确定位(例如通过 ID 或任何过滤条件)。因此,我仍然不喜欢其中狭隘的理解部分,而是预先选择明确定义的数据的行为。

其次:我的个人经历是这样的:如果您正在文件解析中处理完全规范化的数据模型数据库读取操作,那么通常会简化为“给我之前转储到的内容的主键”数据库。当您一次写入多行时,这似乎变得很棘手。然而,特别是在 MySQL 中,您绝对可以依赖“每个插入语句(甚至多行插入)都是原子的”,您可以从 last_insert_id() 获取 ID,因此您可以跟踪到所有以前写入的记录。我很确定其他数据库系统也有类似的“问题”。

第三:解析大型文件是我尝试将其作为一项工作进行操作,只有一名技术用户触发该任务,并确保这些进程中不超过 1 个并行运行。否则,您需要解决所有类型的问题,从文件锁定进入 session 权限读/写管理。因此,将其作为作业运行(至少在我的个人策略中)以分配大量 RAM - 取决于成本和速度的重要性。这意味着我什至不会费心将 100 K 行关键字到 id 表预先加载到内存中。

关于java - 数据库的多线程问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19705115/

相关文章:

multithreading - 意外的异步行为 : Springs's @Async vs RxJava

java - 如何在 hibernate 中将枚举序号映射到它的值?

mysql - Spring 3 - Hibernate 3 - 查询多个 MySQL 数据库

java - Android Studio 在调试器中将 byte[] 视为字符串

java - Java中,连接mysql,Class.forName是什么意思?

java - 比较两个通用对象的问题

java - 我想通过 Hibernate 使用 BLOB 列类型将文件插入到 Oracle?

java - 更新 DenseVector 类中的元素,Spark

java - 清理线程池的问题

java - 线程转储 : The hexa-decimal (lock, 条件等) - 它们是什么,以及如何理解它们是什么?