jpa - 用于应用程序同步的数据库锁

标签 jpa locking high-availability java-ee-5

我们有一个基于 Java EE 5 的 JSF 应用程序,它运行在 2 个共享 Oracle 数据库的 WebLogic 应用程序服务器上。

对于某些用例来说,只有一个节点在数据库中执行操作是至关重要的,这些操作通常是永久性的后台作业。因此,想法是,一个节点(“主节点”)在数据库中获得某种锁,而另一个节点(“从节点”)识别锁并且只要主节点可用就不会对这些用例执行任何操作.只有当第一个节点不可用时,第二个节点才应该接管工作,因此从那里开始自己持有锁。

我现在的问题是,我们将如何实现这一行为(记住,JPA 1.0),如果一个节点出现故障,数据库中的锁是否会自动释放?或者整个事情应该以不同的方式更好地完成?

最佳答案

这是一个简单的解决方案,类似于 ActiveMQ 所做的,只有一个 master 执行某些操作,而其他正在运行的实例正在等待成为 master。

package com.despegar.bookedia.message.broker.lock;

import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.jdbc.support.DatabaseType;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.jdbc.support.rowset.SqlRowSet;
import org.springframework.transaction.annotation.Transactional;

import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Map;

/**
 * Represents an exclusive lock on a database to avoid multiple brokers running
 * against the same logical database.
 * <p>
 * The Lease Database Locker lets the master broker acquire a lock that's valid for a fixed (usually short) duration after which it expires.
 * To retain the lock the master broker must periodically extend the lock's lease before it expires.
 * Simultaneously the slave broker also checks periodically to see if the lease has expired. If, for whatever reason, the master broker fails to update its
 * lease on the lock the slave will take ownership of the lock becoming the new master in the process. The leased lock can survive a DB replica failover.
 * </p>
 * Each broker in the master/slave pair must have a different leaseHolderId attribute, as it is this value that is used to reserve a lease.
 * <p>
 * In the simplest case, the clocks between master and slave must be in sync for this solution to work properly. If the clocks cannot be in sync, the
 * locker can use the system time from the database CURRENT TIME and adjust the timeouts in accordance with their local variance from the DB system time.
 * If maxAllowableDiffFromDBTime is greater than zero the local periods will be adjusted by any delta that exceeds maxAllowableDiffFromDBTime.
 * </p>
 */
public class LeaseDatabaseLocker implements Locker, AutoCloseable {

    private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
    private static final int IM_THE_MASTER_RESULT = 1;

    private int maxAllowableDiffFromDBTime;
    private long diffFromCurrentTime = Long.MAX_VALUE;
    private String leaseHolderId;
    private JdbcTemplate jdbcTemplate;
    private int queryTimeoutInSecs = -1;
    private long lockAcquireSleepInterval;
    private long lockHeldPeriod;

    public LeaseDatabaseLocker(String leaseHolderId, JdbcTemplate jdbcTemplate, int queryTimeout,
                               long lockAcquireSleepInterval, int maxAllowableDiffFromDBTime, long lockHeldPeriod) {
        this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
        this.jdbcTemplate = jdbcTemplate;
        this.queryTimeoutInSecs = queryTimeout;
        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
        this.leaseHolderId = leaseHolderId;
        this.lockHeldPeriod = lockHeldPeriod;
    }

    @Transactional
    @Override
    public void acquireLock() {

        LOG.debug("Attempting to acquire the exclusive lock to become the Master broker '{}'", leaseHolderId);

        String sql = Statements.LEASE_OBTAIN_STATEMENT;

        initTimeDiff();

        long now = System.currentTimeMillis() + diffFromCurrentTime;
        long nextLockCheck = now + lockHeldPeriod;

        PreparedStatementSetter preparedStatementSetter = statement -> {
            setQueryTimeoutInSecs(statement);
            statement.setString(Statements.ACQUIRE_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId);
            statement.setLong(Statements.ACQUIRE_LOCK_NEXT_CHECK_COL_POSITION, nextLockCheck);
            statement.setLong(Statements.ACQUIRE_LOCK_TIME_NOW_POSITION, now);
        };

        LOG.trace("executing: '{}' to acquire lock with values {}, {}, {}", Statements.LEASE_OBTAIN_STATEMENT, leaseHolderId, nextLockCheck, now);
        int result = jdbcTemplate.update(sql, preparedStatementSetter);
        LOG.trace("Locking query result: updated rows count {}", result);

        if (result == IM_THE_MASTER_RESULT) {
            // we got the lease, verify we still have it
            LOG.debug("Lock acquired for '{}'", leaseHolderId);
            if (keepLockAlive()) {
                LOG.info("Becoming the master on dataSource: {}", jdbcTemplate.getDataSource());
                return;
            }
        }
        reportLeaseOwnerShipAndDuration();

        LOG.debug("{} failed to acquire lease.  Sleeping for {} milli(s) before trying again...", leaseHolderId, lockAcquireSleepInterval);
        throw new BrokerException.LockNotAcquiredException(leaseHolderId);
    }

    private void reportLeaseOwnerShipAndDuration() {
        String sql = Statements.LEASE_OWNER_STATEMENT;

        SqlRowSet rowSet = jdbcTemplate.queryForRowSet(sql);
        while (rowSet.next()) {
            LOG.debug("{} -  Lease held by {} till {}", leaseHolderId, rowSet.getString(1),
                     Instant.ofEpochMilli(rowSet.getLong(2)));
        }
    }

    private void setQueryTimeoutInSecs(Statement statement) throws SQLException {
        if (queryTimeoutInSecs > 0) {
            statement.setQueryTimeout(queryTimeoutInSecs);
        }
    }

    private long initTimeDiff() {
        if (Long.MAX_VALUE == diffFromCurrentTime) {
            if (maxAllowableDiffFromDBTime > 0) {
                diffFromCurrentTime = determineTimeDifference();
            } else {
                diffFromCurrentTime = 0l;
            }
        }
        return diffFromCurrentTime;
    }

    protected long determineTimeDifference() {

        ResultSetExtractor<Timestamp> timestampExtractor = rs -> {
            rs.next();
            return rs.getTimestamp(1);
        };
        Timestamp timestamp = jdbcTemplate.query(Statements.utcTimestamp(jdbcTemplate), timestampExtractor);

        long result = 0L;
        long diff = System.currentTimeMillis() - timestamp.getTime();
        if (Math.abs(diff) > maxAllowableDiffFromDBTime) {
            // off by more than maxAllowableDiffFromDBTime so lets adjust
            result = -diff;
        }
        LOG.info("{} diff adjust from db: {}, db time: {}", leaseHolderId, result, timestamp);
        return result;
    }

    @Transactional
    public boolean keepLockAlive() {
        boolean result;
        final String sql = Statements.LEASE_UPDATE_STATEMENT;

        initTimeDiff();

        final long now = System.currentTimeMillis() + diffFromCurrentTime;
        final long nextLockCheck = now + lockHeldPeriod;

        PreparedStatementSetter statementSetter = statement -> {
            setQueryTimeoutInSecs(statement);
            statement.setString(Statements.KEEP_LOCK_NEW_BROKER_NAME_COL_POSITION, leaseHolderId);
            statement.setLong(Statements.KEEP_LOCK_NEXT_CHECK_COL_POSITION, nextLockCheck);
            statement.setString(Statements.KEEP_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId);
        };

        LOG.trace("executing: '{}' to keep lock alive with values {}, {}", Statements.LEASE_UPDATE_STATEMENT, leaseHolderId, nextLockCheck);
        result = jdbcTemplate.update(sql, statementSetter) == IM_THE_MASTER_RESULT;

        if (!result) {
            reportLeaseOwnerShipAndDuration();
        }

        return result;
    }

    private void releaseLease() {
        String sql = Statements.LEASE_UPDATE_STATEMENT;

        final int lockReleaseTime = 1;
        PreparedStatementSetter statementSetter = statement -> {
            statement.setString(Statements.RELEASE_LOCK_NEW_BROKER_NAME_COL_POSITION, leaseHolderId);
            statement.setLong(Statements.RELEASE_LOCK_NEXT_CHECK_COL_POSITION, lockReleaseTime);
            statement.setString(Statements.RELEASE_LOCK_BROKER_NAME_COL_POSITION, leaseHolderId);
        };

        LOG.trace("executing: '{}' to release lock with values {}, {}, {}", sql, leaseHolderId, 1, leaseHolderId);
        if (jdbcTemplate.update(sql, statementSetter) == IM_THE_MASTER_RESULT) {
            LOG.info("{}, released lease", leaseHolderId);
        }
    }

    @Override
    public void close() throws Exception {
        releaseLease();
    }

    static class Statements {

        public static final String LOCK_TABLE_NAME = "MSG_BROKER_LOCK";

        public static final Map<DatabaseType, String> CURRENT_DATE_TIME_UTC = ImmutableMap.of(DatabaseType.MYSQL,   "SELECT UTC_TIMESTAMP",
                                                                                              DatabaseType.H2,      "SELECT CURRENT_TIMESTAMP");

        public static final String LEASE_UPDATE_STATEMENT =
                String.format("UPDATE %s SET BROKER_NAME=?, %s.TIME=?  WHERE BROKER_NAME=? AND ID = 1", LOCK_TABLE_NAME, LOCK_TABLE_NAME);

        public static final String LEASE_OWNER_STATEMENT =
                String.format("SELECT BROKER_NAME, %s.TIME FROM %s WHERE ID = 1", LOCK_TABLE_NAME, LOCK_TABLE_NAME);

        public static final String LEASE_OBTAIN_STATEMENT =
                String.format("UPDATE %s SET BROKER_NAME=?, %s.TIME=? WHERE (%s.TIME IS NULL OR %s.TIME < ?) AND ID = 1",
                              LOCK_TABLE_NAME, LOCK_TABLE_NAME, LOCK_TABLE_NAME, LOCK_TABLE_NAME);

        //Acquire constants
        public static final int ACQUIRE_LOCK_BROKER_NAME_COL_POSITION = 1;
        public static final int ACQUIRE_LOCK_NEXT_CHECK_COL_POSITION = 2;
        public static final int ACQUIRE_LOCK_TIME_NOW_POSITION = 3;

        //Keep lock alive constants
        public static final int KEEP_LOCK_NEW_BROKER_NAME_COL_POSITION = 1;
        public static final int KEEP_LOCK_NEXT_CHECK_COL_POSITION = 2;
        public static final int KEEP_LOCK_BROKER_NAME_COL_POSITION = 3;

        //Release lock constants
        public static final int RELEASE_LOCK_NEW_BROKER_NAME_COL_POSITION = 1;
        public static final int RELEASE_LOCK_NEXT_CHECK_COL_POSITION = 2;
        public static final int RELEASE_LOCK_BROKER_NAME_COL_POSITION = 3;

        private Statements() {}

        private static String utcTimestamp(JdbcTemplate jdbcTemplate) {
            DatabaseType dbType;
            try {
                dbType = DatabaseType.fromMetaData(jdbcTemplate.getDataSource());
            } catch (MetaDataAccessException e) {
                throw new DataAccessResourceFailureException("Unable to determine database type: ", e);
            }
            String query = CURRENT_DATE_TIME_UTC.get(dbType);
            if(query == null) {
                throw new RuntimeException("Unrecognized DatabaseType: " + dbType);
            }
            return query;
        }
    }
}

关于jpa - 用于应用程序同步的数据库锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19862173/

相关文章:

javascript - 异步ajax请求锁定浏览器

linux - 创建锁定文件时防止竞争条件

mysql - 如何应对SQL Server上的多AZ RDS MySQL?

java - 列的默认命名策略 - 可能吗? (日本公共(public)事务局)

java - 如何在 Java、JPA 中返回 Map<String, Object>?

C++ 多线程互斥锁问题

python - 如何在 python 中设计弹性和高可用的服务?

neo4j - Neo4j 集群是否需要至少 3 个节点?

java - 如何将更新的实体与旧实体进行比较并仅将更新的值存储在数据库中

hibernate - JPA Hibernate左连接获取生成多个查询