cassandra - 如何以不同于其他列的方式映射静态列?

标签 cassandra cassandra-2.0 datastax-java-driver

如何映射 the example given here使用 datastax Java 对象映射到下面的类?

public class User {
    private int user;
    private int balance;

    private List<Bill> bills;
}

public class Bill {
    private String description;
    private int amount;
}

最佳答案

对于 java 驱动程序中的映射模块,静态列不需要与非静态列有任何不同的对待。但您可能会担心的一个问题是您希望保持一定的一致性,因为仅当余额达到预期值时才会更新,因此单独使用 Mapper 的 save 方法是不够的。相反,您将执行一批有条件的余额更新,并在同一批处理中更新费用。

为了方便起见并仍然使用映射器,您可以使用 Accessor-annotated interface定义您的查询并将它们映射回您的对象。然后,您可以使用映射器对象和其他一些方法创建一个数据访问对象,以便与 Cassandra 进行交互。

这需要一些工作,但我认为它为您提供了一种很好的干净方法,可以将您的解决方案从 Cassandra 中抽象出来,同时仍然以惯用的方式使用它。另一种选择是查看 Achilles这是 Cassandra 更高级的对象持久化管理器。 KunderaSpring Data还有其他可能的选择。

首先,让我们看看您的类并将它们映射到博客示例中定义的表:

  CREATE TABLE bills (
     user text,
     balance int static,
     expense_id int,
     amount int,
     description text,
     paid boolean,
     PRIMARY KEY (user, expense_id)
  );

从您的示例中,我怀疑您可能希望使用用户定义类型而不是帐单的单独列,但由于您将这篇文章标记为“cassandra-2.0”并且直到 2.1 才引入 UDT,因此我不会涵盖这一点,但如果您希望我对此进行更多说明,我可以。

让我们定义我们的类 Bill:

@Table(name="bills")
public class Bill {

    @PartitionKey
    private String user;

    private int balance;

    @ClusteringColumn
    @Column(name="expense_id")
    private int expenseId;

    private int amount;

    private String description;

    private boolean paid;

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public int getBalance() {
        return balance;
    }

    public void setBalance(int balance) {
        this.balance = balance;
    }

    public int getExpenseId() {
        return expenseId;
    }

    public void setExpenseId(int expenseId) {
        this.expenseId = expenseId;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public boolean isPaid() {
        return paid;
    }

    public void setPaid(boolean paid) {
        this.paid = paid;
    }
}

我们还可以定义一个 BillAccessor 来与 cassandra 中的 Bill 进行交互,将它们映射回 Bill 对象。这应该涵盖博客文章中的所有查询:

@Accessor
public interface BillAccessor {

    @Query("INSERT INTO bills (user, balance) VALUES (?, ?) IF NOT EXISTS")
    BoundStatement addUser(String user, int balance);

    @Query("UPDATE bills SET balance = :newBalance WHERE user = :user IF balance = :currentBalance")
    BoundStatement updateBalance(@Param("user") String user, @Param("currentBalance") int currentBalance,
                            @Param("newBalance") int newBalance);

    @Query("SELECT balance from bills where user=?")
    ResultSet getBalance(String user);

    @Query("INSERT INTO bills (user, expense_id, amount, description, paid) values (?, ?, ?, ?, false) IF NOT EXISTS")
    BoundStatement addBill(String user, int expenseId, int amount, String description);

    @Query("UPDATE bills set paid=true where user=? and expense_id=? IF paid=false")
    BoundStatement markBillPaid(String user, int expenseId);

    @Query("SELECT * from bills where user=?")
    Result<Bill> getBills(String user);
}

接下来,我们将使用 Bill 类和 BillAccessor 创建一个用于与您的账单交互的 DAO:

public class BillDao {

    private final Session session;

    private final Mapper<Bill> mapper;

    private final BillAccessor accessor;

    public BillDao(Session session) {
        this.session = session;
        MappingManager manager = new MappingManager(session);
        this.mapper = manager.mapper(Bill.class);
        this.accessor = manager.createAccessor(BillAccessor.class);
    }

    public Integer getBalance(String user) {
        ResultSet result = accessor.getBalance(user);
        Row row = result.one();
        if(row == null) {
            return null;
        } else {
            return row.getInt(0);
        }
    }

    public Iterable<Bill> getBills(String user) {
        return accessor.getBills(user);
    }

    public Bill getBill(String user, int expenseId) {
        return mapper.get(user, expenseId);
    }

    public int addBill(String user, int expenseId, int amount, String description) throws UpdateException {
        BatchStatement batch = new BatchStatement();

        Integer balance = getBalance(user);
        if (balance == null) {
            balance = 0;
            // we need to create the user.
            batch.add(accessor.addUser(user, balance - amount));
        } else {
            // we need to update the users balance.
            batch.add(accessor.updateBalance(user, balance, balance - amount));
        }
        batch.add(accessor.addBill(user, expenseId, amount, description));
        ResultSet result = session.execute(batch);

        if (result.wasApplied()) {
            return balance - amount;
        } else {
            throw new UpdateException("Failed applying bill, conditional update failed.");
        }
    }

    public int payForBill(Bill bill) throws UpdateException {
        Integer balance = getBalance(bill.getUser());
        if(balance == null) {
            throw new UpdateException("Failed paying for bill, user doesn't exist!");
        }
        BatchStatement batch = new BatchStatement();
        batch.add(accessor.updateBalance(bill.getUser(), balance, bill.getAmount() + balance));
        batch.add(accessor.markBillPaid(bill.getUser(), bill.getExpenseId()));

        ResultSet result = session.execute(batch);

        if(result.wasApplied()) {
            return bill.getAmount() + balance;
        } else {
            throw new UpdateException("Failed paying for bill, conditional update failed.");
        }
    }

    public class UpdateException extends Exception {
        public UpdateException(String msg) {
            super(msg);
        }
    }
}

请注意,我们通过检查 ResultSet.wasApplied() 来检查是否应用了更改。 。由于我们正在进行条件更新,因此如果我们的条件不成立,则可能不会应用更改。如果未应用更改,DAO 将简单地抛出 UpdateException,但您可以选择不同的策略,例如在 DAO 中重试任意次数。

最后让我们编写一些代码来练习 DAO:

Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
try {
    Session session = cluster.connect("readtest");
    BillDao billDao = new BillDao(session);

    String user = "chandru";

    // Create a bill, should exercise user create logic.
    int balance = billDao.addBill(user, 1, 10, "Sandwich");
    System.out.format("Bill %s/%d created, current balance is %d.%n", user, 1, balance);

    // Create another bill, should exercise balance update logic.
    balance = billDao.addBill(user, 2, 6, "Salad");
    System.out.format("Bill %s/%d created, current balance is %d.%n", user, 2, balance);

    // Pay for all the bills!
    for(Bill bill : billDao.getBills(user)) {
        balance = billDao.payForBill(bill);
        System.out.format("Paid for %s/%d, current balance is %d.%n", user, bill.getExpenseId(), balance);

        // Ensure bill was paid.
        Bill newBill = billDao.getBill(user, bill.getExpenseId());
        System.out.format("Is %s/%d paid for?: %b.%n", user, newBill.getExpenseId(), newBill.isPaid());
    }

    // Try to add another bill with an already used expense id.
    try {
        billDao.addBill(user, 1, 1, "Diet Coke");
    } catch(BillDao.UpdateException ex) {
        System.err.format("Could not add bill %s/%d: %s", user, 1, ex.getMessage());
    }

} finally {
    cluster.close();
}

如果一切顺利,您应该观察到以下输出:

Bill chandru/1 created, current balance is -10.
Bill chandru/2 created, current balance is -16.
Paid for chandru/1, current balance is -6.
Is chandru/1 paid for?: true.
Paid for chandru/2, current balance is 0.
Is chandru/2 paid for?: true.
Could not add bill chandru/1: Failed applying bill, conditional update failed.

你的表的状态将是:

cqlsh:readtest> select * from bills;

 user    | expense_id | balance | amount | description | paid
---------+------------+---------+--------+-------------+------
 chandru |          1 |       0 |     10 |    Sandwich | True
 chandru |          2 |       0 |      6 |       Salad | True

关于cassandra - 如何以不同于其他列的方式映射静态列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29575571/

相关文章:

java - cassandra单节点连接错误

hadoop - 从 Cassandra 加载数据

java - Cassandra 在更新另一个表时触发更新另一个表

cassandra - 如何仅在特定数据中心内运行修复?

cassandra - 有没有办法查看 cassandra 中使用 vnode 的每个节点的 token 范围?

java - 写C*超时以不可预测的方式出现

cassandra - Cassandra 集群中出现意外的 UnavailableException

java - Java 项目中的 CouchDB、Project Voldemort、Cassandra

centos - 在 CentOS 6.5 64 位和 Cassandra PDO 上安装 thrift

cassandra - 从 cassandra 读取时一致性级别发生变化