c++ - Berkeley DB,并发队列

标签 c++ berkeley-db

我正在尝试使用 Berkeley DB 实现并发持久队列。作为初学者,我尝试制作两个都附加到数据库的过程:

#include <unistd.h>
#include <sstream>
#include <db_cxx.h>

class Queue : public DbEnv
{
    public:
    Queue ( ) :
        DbEnv(0),
        db(0)
    {
        set_flags(DB_CDB_ALLDB, 1);
        open("/tmp/db", DB_INIT_LOCK  |
                DB_INIT_LOG   |
                DB_INIT_TXN   |
                DB_INIT_MPOOL |
                DB_RECOVER    |
                DB_CREATE     |
                DB_THREAD,
                0);

        db = new Db(this, 0); 
        db->set_flags(DB_RENUMBER);
        db->open(NULL, "db", NULL, DB_RECNO, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0);
    }
    virtual ~Queue ()
    {
        db->close(0);
        delete db;
        close(0);
    }

    protected:
    Db * db;
};

class Enqueue : public Queue
{
    public:
    Enqueue ( ) : Queue() { }
    virtual ~Enqueue () { }

    bool push(const std::string& s)
    {
        int res;
        DbTxn * txn;

        try {
            txn_begin(NULL, &txn, DB_TXN_SYNC | DB_TXN_WAIT );

            db_recno_t k0[4]; // not sure how mutch data is needs???
            k0[0] = 0;

            Dbt val((void*)s.c_str(), s.length());
            Dbt key((void*)&k0, sizeof(k0[0]));
            key.set_ulen(sizeof(k0));
            key.set_flags(DB_DBT_USERMEM);

            res = db->put(txn, &key, &val, DB_APPEND);

            if( res == 0 ) {
                txn->commit(0);
                return true;

            } else {
                std::cerr << "push failed: " << res << std::endl;
                txn->abort();
                return false;

            }
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            txn->abort();
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            txn->abort();
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            txn->abort();
            return false;
        }
    }
};

using namespace std;

int main(int argc, const char *argv[])
{
    fork();

    Enqueue e;

    stringstream ss;
    for(int i = 0; i < 10; i++){
        ss.str("");
        ss << "asdf" << i;
        cout << ss.str() << endl;
        if( ! e.push(ss.str()) )
            break;
    }

    return 0;
}

编译它:

g++ test.cxx -I/usr/include/db4.8 -ldb_cxx-4.8

创建数据库目录

mkdir /tmp/db

当我运行它时,我会遇到各种错误(段错误、分配错误,有时它确实有效)

我确定我错过了一些锁定,但我只是不知道该怎么做。因此,我们非常欢迎任何解决此问题的提示和/或建议。

最佳答案

仅作记录,这是我在多次谷歌搜索和反复试验后确定的解决方案。

该应用程序是一个回拨进程,其中生产者正在添加数据,消费者尝试将其发送回家。如果消费者未能将其寄回家,则必须重试。当消费者尝试接收数据时,数据库不得为生产者阻塞。

代码有一个文件锁,并且只允许一个消费者进程。

代码如下:

#include <db_cxx.h>
#include <sstream>
#include <fstream>
#include <vector>

#include <boost/interprocess/sync/file_lock.hpp>

class Queue : public DbEnv
{
public:
    Queue ( bool sync ) :
        DbEnv(0),
        db(0)
    {
        set_flags(DB_CDB_ALLDB, 1);

        if( sync )
            set_flags(DB_TXN_NOSYNC, 0);
        else
            set_flags(DB_TXN_NOSYNC, 1);

        open("/tmp/db", DB_INIT_LOCK |
             DB_INIT_LOG | DB_INIT_TXN | DB_INIT_MPOOL |
             DB_REGISTER | DB_RECOVER | DB_CREATE | DB_THREAD,
             0);

        db = new Db(this, 0);
        db->set_flags(DB_RENUMBER);
        db->open(NULL, "db", NULL, DB_RECNO, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0);
    }
    virtual ~Queue ()
    {
        db->close(0);
        delete db;
        close(0);
    }

protected:
    Db * db;
};

struct Transaction
{
    Transaction() : t(0) { }

    bool init(DbEnv * dbenv ){
        try {
            dbenv->txn_begin(NULL, &t, 0);
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            return false;
        }
        return true;
    }

    ~Transaction(){ if( t!=0) t->abort(); }

    void abort() { t->abort(); t = 0; }
    void commit() { t->commit(0); t = 0; }

    DbTxn * t;
};

struct Cursor
{
    Cursor() : c(0) { }

    bool init( Db * db,  DbTxn * t) {
        try {
            db->cursor(t, &c, 0);
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            return false;
        }
        return true;
    }

    ~Cursor(){ if( c!=0) c->close(); }
    void close(){ c->close(); c = 0; }
    Dbc * c;
};

class Enqueue : public Queue
{
public:
    Enqueue ( bool sync ) : Queue(sync) { }
    virtual ~Enqueue () { }

    bool push(const std::string& s)
    {
        int res;
        Transaction transaction;

        if( ! transaction.init(this) )
            return false;

        try {
            db_recno_t k0[4]; // not sure how mutch data is needs???
            k0[0] = 0;

            Dbt val((void*)s.c_str(), s.length());
            Dbt key((void*)&k0, sizeof(k0[0]));
            key.set_ulen(sizeof(k0));
            key.set_flags(DB_DBT_USERMEM);

            res = db->put(transaction.t, &key, &val, DB_APPEND);

            if( res == 0 ) {
                transaction.commit();
                return true;

            } else {
                std::cerr << "push failed: " << res << std::endl;
                return false;

            }

        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            return false;
        }
    }
};

const char * create_file(const char * f ){
    std::ofstream _f;
    _f.open(f, std::ios::out);
    _f.close();
    return f;
}

class Dequeue : public Queue
{
public:
    Dequeue ( bool sync ) :
        Queue(sync),
        lock(create_file("/tmp/db-test-pop.lock")),
        number_of_records_(0)
    {
        std::cout << "Trying to get exclusize access to database" << std::endl;
        lock.lock();
    }

    virtual ~Dequeue ()
    {
    }

    bool pop(size_t number_of_records, std::vector<std::string>& records)
    {
        if( number_of_records_ != 0 ) // TODO, warning
            abort();

        Cursor cursor;
        records.clear();

        if( number_of_records_ != 0 )
            abort(); // TODO, warning

        // Get a cursor
        try {
            db->cursor(0, &cursor.c, 0);
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            abort();
            return false;
        }

        // Read and delete
        try {
            Dbt val;

            db_recno_t k0 = 0;
            Dbt key((void*)&k0, sizeof(k0));

            for( size_t i = 0; i < number_of_records; i ++ ) {
                int get_res = cursor.c->get(&key, &val, DB_NEXT);

                if( get_res == 0 )
                    records.push_back(std::string((char *)val.get_data(), val.get_size()));
                else
                    break;
            }

            number_of_records_ = records.size();
            if( number_of_records_ == 0 ) {
                abort();
                return false;
            } else {
                return true;
            }

        } catch( DbException e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            abort();
            return false;
        } catch( std::exception e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            abort();
            return false;
        }
    }

    bool commit()
    {
        if( number_of_records_ == 0 )
            return true;

        Transaction transaction;
        Cursor      cursor;

        if( ! transaction.init(this) )
            return false;

        if( ! cursor.init(db, transaction.t) )
            return false;

        // Read and delete
        try {
            Dbt val;

            db_recno_t k0 = 0;
            Dbt key((void*)&k0, sizeof(k0));

            for( size_t i = 0; i < number_of_records_; i ++ ) {
                int get_res = cursor.c->get(&key, &val, DB_NEXT);

                if( get_res == 0 )
                    cursor.c->del(0);
                else
                    break; // this is bad!
            }

            number_of_records_ = 0;
            cursor.close();
            transaction.commit();

            return true;

        } catch( DbException e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            return false;
        }
    }

    void abort()
    {
        number_of_records_ = 0;
    }

private:
    boost::interprocess::file_lock lock;
    size_t  number_of_records_;
    sigset_t orig_mask;
};

如果您发现任何错误,或者知道更简单的方法,请告诉我。

关于c++ - Berkeley DB,并发队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7302268/

相关文章:

c - 无法访问 Berkeley DB 使用 C 创建的数据库中的多个数据

c - Oracle Berkeley DB 中记录概念的查询

sql - 在 Web 应用程序中提供快速 `select count(*)` 功能

c++ - 检测到堆损坏 : after Normal block (#126)

c++ - C++中的流操纵器如何成为函数?

c++ - SFML Sprite 运动未正确更新

java - 将更多记录写入 berkeleyDB 时出现问题

c++ - Berkeley DB:DbEnv::lsn_reset 需要很长时间

c++ - 将指针分配给引用

C++ vector 语法错误