2007-03-19 Casey Marshall <csm@soe.ucsc.edu>

* osbdb/OSBDB.cc (CLEANUP, COMMIT): new macros.
	(scoped_lock): new class.
	(getenv): new function.
	The rest replaces Context cleanup/commit with macros; fixes
	some debug output; adds locks to mutator methods.
	* osbdb/OSBDB.h (lock): new member.
	(OSBDB): initialize `lock.'
	(getenv): new function.



git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1264 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
rsdio 2007-03-20 05:58:05 +00:00
parent c8c238eea2
commit eb07ec9646
2 changed files with 327 additions and 61 deletions

View File

@ -9,8 +9,11 @@ License version 2.1, as published by the Free Software
Foundation. See file COPYING. */
#include <map>
#include <string>
#include <cerrno>
#include "OSBDB.h"
#include "common/Timer.h"
using namespace std;
@ -19,6 +22,26 @@ using namespace std;
#undef derr
#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_bdbstore) cerr << "bdbstore(" << device << ")@" << __LINE__ << "."
#define CLEANUP(onsafe) do { \
dout(6) << "DELETE " << hex << onsafe << dec << endl; \
delete onsafe; \
} while (0)
#define COMMIT(onsafe) do { \
dout(6) << "COMMIT " << hex << onsafe << dec << endl; \
sync(onsafe); \
} while (0)
// Have a lock, already.
class scoped_lock
{
private:
Mutex *m;
public:
scoped_lock(Mutex *m) : m(m) { m->Lock(); }
~scoped_lock() { m->Unlock(); }
};
// Utilities.
// Starting off with my own bsearch; mail reader to follow...
@ -61,33 +84,33 @@ uint32_t binary_search (T *array, size_t size, T key)
// Management.
DbEnv *OSBDB::getenv ()
{
DbEnv *envp = new DbEnv (DB_CXX_NO_EXCEPTIONS);
if (g_conf.debug > 1 || g_conf.debug_bdbstore > 1)
envp->set_error_stream (&std::cerr);
if (g_conf.debug > 2 || g_conf.debug_bdbstore > 2)
envp->set_message_stream (&std::cout);
envp->set_flags (DB_LOG_INMEMORY, 1);
//env->set_flags (DB_DIRECT_DB, 1);
int env_flags = (DB_CREATE
| DB_THREAD
//| DB_INIT_LOCK
| DB_INIT_MPOOL
//| DB_INIT_TXN
//| DB_INIT_LOG
| DB_PRIVATE);
if (envp->open (NULL, env_flags, 0) != 0)
{
std::cerr << "failed to open environment " << std::endl;
assert(0);
}
return envp;
}
int OSBDB::opendb(DBTYPE type, int flags, bool new_env)
{
// BDB transactions require an environment.
if (g_conf.bdbstore_transactional)
{
env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
env->set_error_stream (&std::cerr);
env->set_message_stream (&std::cout);
env->set_flags (DB_LOG_INMEMORY, 1);
//env->set_flags (DB_DIRECT_DB, 1);
int env_flags = (DB_CREATE
| DB_THREAD
| DB_INIT_LOCK
| DB_INIT_MPOOL
| DB_INIT_TXN
| DB_INIT_LOG
| DB_PRIVATE);
//if (new_env)
// env->remove (env_dir.c_str(), 0);
if (env->open (NULL, env_flags, 0) != 0)
{
std::cerr << "failed to open environment " << std::endl;
return -EIO;
}
}
env = getenv();
db = new Db(env, 0);
db->set_error_stream (&std::cerr);
db->set_message_stream (&std::cout);
@ -115,7 +138,7 @@ int OSBDB::opendb(DBTYPE type, int flags, bool new_env)
if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
{
derr(1) << "failed to open database: " << device << ": "
<< strerror(ret) << std::endl;
<< db_strerror(ret) << std::endl;
return -EINVAL;
}
opened = true;
@ -248,7 +271,9 @@ int OSBDB::mkfs()
dout(2) << "mkfs" << endl;
unlink (device.c_str());
string d = env_dir;
d += device;
unlink (d.c_str());
int ret;
if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
@ -354,9 +379,13 @@ int OSBDB::remove(object_t oid, Context *onsafe)
if (!mounted)
{
derr(1) << "not mounted!" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
dout(6) << "Context " << hex << onsafe << dec << endl;
scoped_lock __lock(&lock);
dout(2) << "remove " << oid << endl;
DbTxn *txn = NULL;
@ -366,11 +395,28 @@ int OSBDB::remove(object_t oid, Context *onsafe)
oid_t id;
mkoid(id, oid);
Dbt key (&id, sizeof (oid_t));
db->del (NULL, &key, 0);
int ret;
if ((ret = db->del (txn, &key, 0)) != 0)
{
derr(1) << ".del returned error: " << db_strerror (ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
object_inode_key _ikey = new_object_inode_key (oid);
Dbt ikey (&_ikey, sizeof_object_inode_key());
db->del (txn, &ikey, 0);
if ((ret = db->del (txn, &ikey, 0)) != 0)
{
derr(1) << ".del returned error: " << db_strerror (ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
attrs_id aids = new_attrs_id (oid);
Dbt askey (&aids, sizeof_attrs_id());
@ -385,32 +431,57 @@ int OSBDB::remove(object_t oid, Context *onsafe)
{
attr_id aid = new_attr_id (oid, sap->names[i].name);
Dbt akey (&aid, sizeof (aid));
db->del (txn, &akey, 0);
if ((ret = db->del (txn, &akey, 0)) != 0)
{
derr(1) << ".del returns error: " << db_strerror (ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
}
if ((ret = db->del (txn, &askey, 0)) != 0)
{
derr(1) << ".del returns error: " << db_strerror (ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
db->del (txn, &askey, 0);
}
// XXX check del return value
if (txn)
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..remove OK" << endl;
return 0;
}
int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
{
derr(1) << "not mounted!" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "truncate " << size << endl;
if (size > 0xFFFFFFFF)
{
derr(1) << "object size too big!" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -ENOSPC;
}
@ -431,6 +502,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
dout(4) << "..returns -ENOENT" << endl;
return -ENOENT;
}
@ -450,6 +523,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".updating object failed" << endl;
return -EIO;
}
@ -460,6 +535,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".updating object info failed" << endl;
return -EIO;
}
@ -474,6 +551,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".updating object info failed" << endl;
return -EIO;
}
@ -488,6 +567,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".updating object failed" << endl;
return -EIO;
}
@ -503,6 +584,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".getting old object failed" << endl;
return -EIO;
}
@ -513,6 +596,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".putting new object failed" << endl;
return -EIO;
}
@ -521,6 +606,8 @@ int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
if (txn)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..truncate OK" << endl;
return 0;
@ -537,6 +624,13 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
dout(2) << "read " << oid << " " << offset << " "
<< len << endl;
if (bl.length() < len)
{
int remain = len - bl.length();
bufferptr ptr (remain);
bl.push_back(ptr);
}
DbTxn *txn = NULL;
if (transactional)
env->txn_begin (NULL, &txn, 0);
@ -548,7 +642,7 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
ival.set_flags (DB_DBT_USERMEM);
ival.set_ulen (sizeof(obj));
dout(3) << " get " << _ikey << endl;
dout(3) << "..get " << _ikey << endl;
int ret;
if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
{
@ -558,20 +652,22 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
return -ENOENT;
}
dout(3) << "..object has size " << obj.length << endl;
if (offset == 0 && len >= obj.length)
{
len = obj.length;
dout(3) << " doing full read of " << len << endl;
dout(3) << "..doing full read of " << len << endl;
oid_t id;
mkoid (id, oid);
Dbt key (&id, sizeof (oid_t));
Dbt value (bl.c_str(), len);
value.set_ulen (len);
value.set_flags (DB_DBT_USERMEM);
dout(3) << " getting " << oid << endl;
dout(3) << "..getting " << oid << endl;
if ((ret = db->get (txn, &key, &value, 0)) != 0)
{
derr(1) << " get returned " << db_strerror (ret) << endl;
derr(1) << ".get returned " << db_strerror (ret) << endl;
if (txn)
txn->abort();
return -EIO;
@ -586,19 +682,22 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
}
if (offset + len > obj.length)
len = obj.length - (size_t) offset;
dout(3) << " doing partial read of " << len << endl;
dout(3) << "..doing partial read of " << len << endl;
oid_t id;
mkoid (id, oid);
Dbt key (&id, sizeof (oid));
Dbt value (bl.c_str(), len);
Dbt value;
char *data = bl.c_str();
dout(3) << ".bufferlist c_str returned " << ((void*) data) << endl;
value.set_data (data);
value.set_doff ((size_t) offset);
value.set_dlen (len);
value.set_ulen (len);
value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
dout(3) << " getting " << oid << endl;
dout(3) << "..getting " << oid << endl;
if ((ret = db->get (txn, &key, &value, 0)) != 0)
{
derr(1) << "get returned " << db_strerror (ret) << endl;
derr(1) << ".get returned " << db_strerror (ret) << endl;
if (txn)
txn->abort();
return -EIO;
@ -614,18 +713,24 @@ int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
int OSBDB::write(object_t oid, off_t offset, size_t len,
bufferlist& bl, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
{
derr(1) << "not mounted!" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "write " << oid << " " << offset << " "
<< len << endl;
if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL)
{
derr(1) << "object too big" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -ENOSPC;
}
@ -655,6 +760,8 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
derr(1) << "..put returned " << db_strerror (ret) << endl;
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
@ -681,11 +788,15 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
derr(1) << "..put returned " << db_strerror (ret) << endl;
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
if (txn)
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..write OK, returning " << len << endl;
return len;
@ -701,6 +812,8 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
derr(1) << " put returned " << db_strerror (ret) << endl;
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
}
@ -712,6 +825,8 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "..writing object failed!" << endl;
return -EIO;
}
@ -725,6 +840,8 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "..writing object info failed!" << endl;
return -EIO;
}
@ -741,13 +858,17 @@ int OSBDB::write(object_t oid, off_t offset, size_t len,
{
if (txn)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "..writing object failed!" << endl;
return -EIO;
}
}
if (txn)
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..write OK, returning " << len << endl;
return len;
@ -861,12 +982,16 @@ int OSBDB::list_collections(list<coll_t>& ls)
int OSBDB::create_collection(coll_t c, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
{
derr(1) << "not mounted" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "create_collection " << hex << c << dec << endl;
Dbt key (COLLECTIONS_KEY, 1);
@ -897,10 +1022,12 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
int ins = 0;
if (scp->count > 0)
ins = binary_search<coll_t> (scp->colls, scp->count, c);
if (scp->colls[ins] == c)
if (ins < scp->count && scp->colls[ins] == c)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".collection " << c << " already exists " << endl;
return -EEXIST;
}
@ -936,6 +1063,8 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".writing new collections list failed" << endl;
return -EIO;
}
@ -951,6 +1080,8 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".writing new collection failed" << endl;
return -EIO;
}
@ -958,6 +1089,8 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..create_collection OK" << endl;
return 0;
@ -965,12 +1098,16 @@ int OSBDB::create_collection(coll_t c, Context *onsafe)
int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
{
derr(1) << "not mounted" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "destroy_collection " << hex << c << dec << endl;
Dbt key (COLLECTIONS_KEY, 1);
@ -985,6 +1122,8 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".collection list doesn't exist" << endl;
return -ENOENT; // XXX
}
@ -995,15 +1134,19 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".collection " << c << " not listed" << endl;
return -ENOENT;
}
uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
dout(4) << "..insertion point is " << ins << endl;
if (scp->colls[ins] != c)
if (ins >= scp->count || scp->colls[ins] != c)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".collection " << c << " not listed" << endl;
return -ENOENT;
}
@ -1027,6 +1170,8 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".putting modified collection list failed" << endl;
return -EIO;
}
@ -1037,12 +1182,16 @@ int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".deleting collection failed" << endl;
return -EIO;
}
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..destroy_collection OK" << endl;
return 0;
}
@ -1111,12 +1260,16 @@ int OSBDB::collection_stat(coll_t c, struct stat *st)
int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
{
dout(2) << "not mounted" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "collection_add " << hex << c << dec << " " << o << endl;
Dbt key (&c, sizeof (coll_t));
@ -1131,6 +1284,8 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "failed to find collection" << endl;
return -ENOENT;
}
@ -1145,10 +1300,12 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
{
ins = binary_search<object_t> (scp->objects, scp->count, o);
// Already there?
if (scp->objects[ins] == o)
if (ins < scp->count && scp->objects[ins] == o)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "collection already has object" << endl;
return -EEXIST;
}
@ -1176,24 +1333,32 @@ int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "..putting modified collection failed" << endl;
return -EIO;
}
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..collection add OK" << endl;
return 0;
}
int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
{
derr(1) << "not mounted" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "collection_remove " << hex << c << dec << " " << o << endl;
Dbt key (&c, sizeof (coll_t));
@ -1208,6 +1373,8 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
dout(1) << "..collection doesn't exist" << endl;
return -ENOENT;
}
@ -1220,15 +1387,19 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
dout(1) << "..collection is empty" << endl;
return -ENOENT;
}
uint32_t ins = binary_search<object_t> (scp->objects, scp->count, o);
dout(4) << "..insertion point is " << ins << endl;
if (scp->objects[ins] != o)
if (ins >= scp->count || scp->objects[ins] != o)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
dout(1) << "..object not in collection" << endl;
return -ENOENT;
}
@ -1248,12 +1419,16 @@ int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << "..putting modified collection failed" << endl;
return -EIO;
}
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..collection remove OK" << endl;
return 0;
}
@ -1296,11 +1471,23 @@ int OSBDB::_setattr(object_t oid, const char *name,
const void *value, size_t size, Context *onsafe,
DbTxn *txn)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
return -EINVAL;
{
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
return -ENAMETOOLONG;
{
derr(1) << "name too long: " << name << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -ENAMETOOLONG;
}
scoped_lock __lock(&lock);
// Add name to attribute list, if needed.
attrs_id aids = new_attrs_id (oid);
@ -1333,7 +1520,8 @@ int OSBDB::_setattr(object_t oid, const char *name,
if (sap->count > 0)
ins = binary_search<attr_name> (sap->names, sap->count, _name);
dout(3) << "..insertion point is " << ins << endl;
if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
if (sap->count == 0 ||
(ins >= sap->count || strcmp (sap->names[ins].name, name) != 0))
{
sz += sizeof (attr_name);
dout(3) << "..realloc " << ((void *) sap) << " to "
@ -1361,6 +1549,8 @@ int OSBDB::_setattr(object_t oid, const char *name,
if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
{
derr(1) << ".writing attributes list failed" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
}
@ -1379,10 +1569,14 @@ int OSBDB::_setattr(object_t oid, const char *name,
if (db->put (txn, &attr_key, &attr_val, 0) != 0)
{
derr(1) << ".writing attribute key failed" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
dout(4) << "..setattr OK" << endl;
if (onsafe != NULL)
COMMIT(onsafe);
return 0;
}
@ -1390,8 +1584,13 @@ int OSBDB::setattr(object_t oid, const char *name,
const void *value, size_t size,
Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
return -EINVAL;
{
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
DbTxn *txn = NULL;
if (transactional)
@ -1416,8 +1615,13 @@ int OSBDB::setattr(object_t oid, const char *name,
int OSBDB::setattrs(object_t oid, map<string,bufferptr>& aset,
Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
return -EINVAL;
{
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
DbTxn *txn = NULL;
@ -1459,9 +1663,10 @@ int OSBDB::_getattr (object_t oid, const char *name, void *value, size_t size)
val.set_dlen (size);
val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
if (db->get (NULL, &key, &val, 0) != 0)
int ret;
if ((ret = db->get (NULL, &key, &val, 0)) != 0)
{
derr(1) << ".getting value failed" << endl;
derr(1) << ".getting value failed: " << db_strerror (ret) << endl;
return -ENOENT;
}
@ -1496,9 +1701,15 @@ int OSBDB::getattrs(object_t oid, map<string,bufferptr>& aset)
int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
return -EINVAL;
{
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "rmattr " << oid << " " << name << endl;
attrs_id aids = new_attrs_id (oid);
@ -1515,6 +1726,8 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -ENOENT;
}
@ -1527,6 +1740,8 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".empty attribute list" << endl;
return -ENOENT;
}
@ -1536,10 +1751,12 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
int ins = binary_search<attr_name> (sap->names, sap->count, _name);
dout(4) << "..insertion point is " << ins << endl;
if (strcmp (sap->names[ins].name, name) != 0)
if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".attribute not found in list" << endl;
return -ENOENT;
}
@ -1561,6 +1778,8 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
@ -1572,11 +1791,15 @@ int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..rmattr OK" << endl;
return 0;
}
@ -1626,14 +1849,22 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
const void *value, size_t size,
Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
return -EINVAL;
{
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "collection_setattr " << hex << cid << dec << " " << name
<< " (" << size << " bytes)" << endl;
if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
{
derr(1) << "name too long" << endl;
if (onsafe != NULL)
CLEANUP(onsafe);
return -ENAMETOOLONG;
}
@ -1672,7 +1903,7 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
if (sap->count > 0)
ins = binary_search<attr_name> (sap->names, sap->count, _name);
dout(3) << " insertion point is " << ins << endl;
if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)
{
sz += sizeof (attr_name);
dout(3) << " realloc " << hex << ((void *) sap) << " to "
@ -1701,6 +1932,8 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".putting new attributes failed" << endl;
return -EIO;
}
@ -1721,12 +1954,16 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".putting attribute failed" << endl;
return -EIO;
}
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..collection setattr OK" << endl;
return 0;
@ -1735,9 +1972,15 @@ int OSBDB::collection_setattr(coll_t cid, const char *name,
int OSBDB::collection_rmattr(coll_t cid, const char *name,
Context *onsafe)
{
dout(6) << "Context " << hex << onsafe << dec << endl;
if (!mounted)
return -EINVAL;
{
if (onsafe != NULL)
CLEANUP(onsafe);
return -EINVAL;
}
scoped_lock __lock(&lock);
dout(2) << "collection_rmattr " << hex << cid << dec
<< " " << name << endl;
@ -1754,6 +1997,8 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".no attributes list" << endl;
return -ENOENT;
}
@ -1766,6 +2011,8 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".empty attributes list" << endl;
return -ENOENT;
}
@ -1774,10 +2021,12 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
memset(&_name, 0, sizeof (_name));
strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
int ins = binary_search<attr_name> (sap->names, sap->count, _name);
if (strcmp (sap->names[ins].name, name) != 0)
if (ins >= sap->count || strcmp (sap->names[ins].name, name) != 0)
{
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
derr(1) << ".attribute not listed" << endl;
return -ENOENT;
}
@ -1799,6 +2048,8 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
@ -1810,11 +2061,15 @@ int OSBDB::collection_rmattr(coll_t cid, const char *name,
derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
if (txn != NULL)
txn->abort();
if (onsafe != NULL)
CLEANUP(onsafe);
return -EIO;
}
if (txn != NULL)
txn->commit (0);
if (onsafe != NULL)
COMMIT(onsafe);
dout(4) << "..collection rmattr OK" << endl;
return 0;
@ -1893,7 +2148,11 @@ void OSBDB::sync (Context *onsync)
return;
sync();
// huh?
if (onsync != NULL)
{
g_timer.add_event_after(0.1, onsync);
}
}
void OSBDB::sync()
@ -1901,5 +2160,10 @@ void OSBDB::sync()
if (!mounted)
return;
if (transactional)
{
env->log_flush (NULL);
env->lsn_reset (device.c_str(), 0);
}
db->sync(0);
}

View File

@ -389,6 +389,7 @@ public:
class OSBDB : public ObjectStore
{
private:
Mutex lock;
DbEnv *env;
Db *db;
string device;
@ -400,7 +401,7 @@ class OSBDB : public ObjectStore
public:
OSBDB(const char *dev) throw(OSBDBException)
: env(0), db (0), device (dev), mounted(false), opened(false),
: lock(true), env(0), db (0), device (dev), mounted(false), opened(false),
transactional(g_conf.bdbstore_transactional)
{
}
@ -475,4 +476,5 @@ private:
int _setattr(object_t oid, const char *name, const void *value,
size_t size, Context *onsync, DbTxn *txn);
int _getattr(object_t oid, const char *name, void *value, size_t size);
DbEnv *getenv();
};