From 3968ca40f0c378d6bed492ff44b3137ef960b2a8 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 23 Feb 2007 21:57:05 +0000 Subject: [PATCH] merged trunk changes r1107:1121 into branches/riccardo/monitor1 git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1122 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/riccardo/monitor2/Makefile | 42 +- branches/riccardo/monitor2/client/Client.cc | 7 +- .../riccardo/monitor2/client/FileCache.cc | 2 +- branches/riccardo/monitor2/config.cc | 33 + branches/riccardo/monitor2/config.h | 9 + branches/riccardo/monitor2/ebofs/Ebofs.cc | 8 + branches/riccardo/monitor2/osbdb/OSBDB.cc | 1395 +++++++++++++++++ branches/riccardo/monitor2/osbdb/OSBDB.h | 507 ++++++ branches/riccardo/monitor2/osd/OSD.cc | 9 + branches/riccardo/monitor2/test/testos.cc | 308 ++++ 10 files changed, 2306 insertions(+), 14 deletions(-) create mode 100644 branches/riccardo/monitor2/osbdb/OSBDB.cc create mode 100644 branches/riccardo/monitor2/osbdb/OSBDB.h create mode 100644 branches/riccardo/monitor2/test/testos.cc diff --git a/branches/riccardo/monitor2/Makefile b/branches/riccardo/monitor2/Makefile index 923e0f2c5af..67097eab65a 100644 --- a/branches/riccardo/monitor2/Makefile +++ b/branches/riccardo/monitor2/Makefile @@ -29,6 +29,11 @@ endif CC = g++ LIBS = -lpthread +ifeq ($(want_bdb),yes) +CFLAGS += -DUSE_OSBDB +OSBDB_LIBS = -ldb_cxx +endif + #for normal mpich2 machines MPICC = mpicxx MPICFLAGS = ${CFLAGS} @@ -99,7 +104,14 @@ CLIENT_OBJS= \ client/SyntheticClient.o\ client/Trace.o -TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn +ifeq ($(want_bdb),yes) +OSBDB_OBJS = \ + osbdb/OSBDB.o + +OSBDB_OBJ = osbdb.o +endif + +TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn mkmonmap SRCS=*.cc */*.cc *.h */*.h */*/*.h @@ -117,8 +129,8 @@ mkmonmap: mkmonmap.cc common.o cmon: cmon.cc mon.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${LIBS} $^ -o $@ -cosd: cosd.cc osd.o ebofs.o msg/SimpleMessenger.o common.o - ${CC} ${CFLAGS} ${LIBS} $^ -o $@ +cosd: cosd.cc osd.o ebofs.o ${OSBDB_OBJ} msg/SimpleMessenger.o common.o + ${CC} ${CFLAGS} ${LIBS} ${OSBDB_LIBS} $^ -o $@ cmds: cmds.cc mds.o osdc.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${LIBS} $^ -o $@ @@ -136,19 +148,19 @@ gprof-helper.so: test/gprof-helper.c # fake* -fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o msg/FakeMessenger.o common.o - ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@ +fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o ${OSBDB_OBJ} client/fuse.o msg/FakeMessenger.o common.o + ${CC} -pg ${CFLAGS} ${LIBS} ${OSBDB_LIBS} -lfuse $^ -o $@ -fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o - ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@ +fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o ${OSBDB_OBJ} osdc.o msg/FakeMessenger.o common.o + ${CC} -pg ${CFLAGS} ${LIBS} ${OSBDB_LIBS} $^ -o $@ # mpi startup -newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o - ${MPICC} -pg ${MPICFLAGS} ${MPILIBS} $^ -o $@ +newsyn: newsyn.cc mon.o mds.o client.o osd.o ebofs.o ${OSBDB_OBJ} osdc.o msg/SimpleMessenger.o common.o + ${MPICC} -pg ${MPICFLAGS} ${MPILIBS} ${OSBDB_LIBS} $^ -o $@ -newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/SimpleMessenger.o common.o - ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ +newsyn.nopg: newsyn.cc mon.o mds.o client.o osd.o ebofs.o ${OSBDB_OBJ} osdc.o msg/SimpleMessenger.o common.o + ${MPICC} ${MPICFLAGS} ${MPILIBS} ${OSBDB_LIBS} $^ -o $@ # ebofs @@ -184,6 +196,11 @@ mdtest: bench/mdtest/mdtest.o mdtest.ceph: bench/mdtest/mdtest.o libceph.o ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ +# OSD test + +testos: test/testos.o ebofs.o osbdb.o common.o + ${CC} ${CFLAGS} ${LIBS} ${OSBDB_LIBS} -o $@ $^ + # %.so: %.cc @@ -213,6 +230,9 @@ mds.o: ${MDS_OBJS} mon.o: ${MON_OBJS} ${LDINC} $@ $^ +osbdb.o: ${OSBDB_OBJS} + ${LDINC} $@ $^ + %.o: %.cc ${CC} ${CFLAGS} -c $< -o $@ diff --git a/branches/riccardo/monitor2/client/Client.cc b/branches/riccardo/monitor2/client/Client.cc index b9556623d63..2b85f1a7ad9 100644 --- a/branches/riccardo/monitor2/client/Client.cc +++ b/branches/riccardo/monitor2/client/Client.cc @@ -949,6 +949,7 @@ void Client::release_caps(Inode *in, dout(5) << "releasing caps on ino " << in->inode.ino << dec << " had " << cap_string(in->file_caps()) << " retaining " << cap_string(retain) + << " want " << cap_string(in->file_caps_wanted()) << endl; for (map::iterator it = in->caps.begin(); @@ -2119,13 +2120,15 @@ int Client::open(const char *relpath, int flags) void Client::close_release(Inode *in) { dout(10) << "close_release on " << in->ino() << endl; + dout(10) << " wr " << in->num_open_wr << " rd " << in->num_open_rd + << " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << endl; if (!in->num_open_rd) in->fc.release_clean(); int retain = 0; - if (in->num_open_wr || in->fc.is_dirty()) retain |= CAP_FILE_WR | CAP_FILE_WRBUFFER; - if (in->num_open_rd || in->fc.is_cached()) retain |= CAP_FILE_WR | CAP_FILE_WRBUFFER; + if (in->num_open_wr || in->fc.is_dirty()) retain |= CAP_FILE_WR | CAP_FILE_WRBUFFER | CAP_FILE_WREXTEND; + if (in->num_open_rd || in->fc.is_cached()) retain |= CAP_FILE_RD | CAP_FILE_RDCACHE; release_caps(in, retain); // release caps now. } diff --git a/branches/riccardo/monitor2/client/FileCache.cc b/branches/riccardo/monitor2/client/FileCache.cc index 36b28dc6003..5d572ab7b67 100644 --- a/branches/riccardo/monitor2/client/FileCache.cc +++ b/branches/riccardo/monitor2/client/FileCache.cc @@ -76,7 +76,7 @@ void FileCache::check_caps() // check callbacks map >::iterator p = caps_callbacks.begin(); while (p != caps_callbacks.end()) { - if (used == 0 || (~(p->first) & used)) { + if (used == 0 || (~(p->first) & used) == 0) { // implemented. dout(10) << "used is " << cap_string(used) << ", caps " << cap_string(p->first) << " implemented, doing callback(s)" << endl; diff --git a/branches/riccardo/monitor2/config.cc b/branches/riccardo/monitor2/config.cc index 44f88663058..9219d3b9d16 100644 --- a/branches/riccardo/monitor2/config.cc +++ b/branches/riccardo/monitor2/config.cc @@ -299,6 +299,17 @@ md_config_t g_conf = { fakeclient_op_truncate: false, fakeclient_op_fsync: false, fakeclient_op_close: 200 + +#ifdef USE_OSBDB + , + bdbstore: false, + debug_bdbstore: 1, + bdbstore_btree: false, + bdbstore_ffactor: 0, + bdbstore_nelem: 0, + bdbstore_pagesize: 0, + bdbstore_cachesize: 0 +#endif // USE_OSBDB }; @@ -772,6 +783,28 @@ void parse_config_options(std::vector& args) g_conf.mds_log = false; } +#ifdef USE_OSBDB + else if (strcmp(args[i], "--bdbstore") == 0) { + g_conf.bdbstore = true; + g_conf.ebofs = 0; + } + else if (strcmp(args[i], "--bdbstore-btree") == 0) { + g_conf.bdbstore_btree = true; + } + else if (strcmp(args[i], "--bdbstore-hash-ffactor") == 0) { + g_conf.bdbstore_ffactor = atoi(args[++i]); + } + else if (strcmp(args[i], "--bdbstore-hash-nelem") == 0) { + g_conf.bdbstore_nelem = atoi(args[++i]); + } + else if (strcmp(args[i], "--bdbstore-hash-pagesize") == 0) { + g_conf.bdbstore_pagesize = atoi(args[++i]); + } + else if (strcmp(args[i], "--bdbstore-cachesize") == 0) { + g_conf.bdbstore_cachesize = atoi(args[++i]); + } +#endif // USE_OSBDB + else { nargs.push_back(args[i]); } diff --git a/branches/riccardo/monitor2/config.h b/branches/riccardo/monitor2/config.h index 0932f7eeea6..b0edea33ffa 100644 --- a/branches/riccardo/monitor2/config.h +++ b/branches/riccardo/monitor2/config.h @@ -289,6 +289,15 @@ struct md_config_t { int fakeclient_op_fsync; int fakeclient_op_close; +#ifdef USE_OSBDB + bool bdbstore; + int debug_bdbstore; + bool bdbstore_btree; + int bdbstore_ffactor; + int bdbstore_nelem; + int bdbstore_pagesize; + int bdbstore_cachesize; +#endif // USE_OSBDB }; extern md_config_t g_conf; diff --git a/branches/riccardo/monitor2/ebofs/Ebofs.cc b/branches/riccardo/monitor2/ebofs/Ebofs.cc index a190b833873..213255cf844 100644 --- a/branches/riccardo/monitor2/ebofs/Ebofs.cc +++ b/branches/riccardo/monitor2/ebofs/Ebofs.cc @@ -16,7 +16,13 @@ #include "Ebofs.h" #include + +#ifndef DARWIN #include +#else +#include +#include +#endif // DARWIN // ******************* @@ -1278,7 +1284,9 @@ int Ebofs::statfs(struct statfs *buf) buf->f_files = nodepool.num_total(); /* total file nodes in file system */ buf->f_ffree = nodepool.num_free(); /* free file nodes in fs */ //buf->f_fsid = 0; /* file system id */ +#ifndef DARWIN buf->f_namelen = 8; /* maximum length of filenames */ +#endif // DARWIN return 0; } diff --git a/branches/riccardo/monitor2/osbdb/OSBDB.cc b/branches/riccardo/monitor2/osbdb/OSBDB.cc new file mode 100644 index 00000000000..c4f4f5a71ac --- /dev/null +++ b/branches/riccardo/monitor2/osbdb/OSBDB.cc @@ -0,0 +1,1395 @@ +/* OSBDB.cc -- ObjectStore on top of Berkeley DB. + Copyright (C) 2007 Casey Marshall + +Ceph - scalable distributed file system + +This is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License version 2.1, as published by the Free Software +Foundation. See file COPYING. */ + + +#include +#include "OSBDB.h" + +using namespace std; + +#undef dout +#define dout(x) if (x <= g_conf.debug_bdbstore) cout << "bdbstore(" << device << ")." +#undef derr +#define derr(x) if (x <= g_conf.debug_bdbstore) cerr << "bdbstore(" << device << ")." + + // Utilities. + +// Starting off with my own bsearch; mail reader to follow... + +// Perform a binary search on a sorted array, returning the insertion +// point for key, or key if it is exactly found. In other words, this +// will return a pointer to the element that will come after key if +// key were to be inserted into the sorted array. +// +// Requires that T have < and > operators defined. +template +uint32_t binary_search (T *array, size_t size, T key) +{ + int low = 0; + int high = size; + int p = (low + high) / 2; + + while (low < high - 1) + { + if (array[p] > key) + { + high = p; + } + else if (array[p] < key) + { + low = p; + } + else + return p; + + p = (low + high) / 2; + } + + if (array[p] < key) + p++; + else if (array[p] > key && p > 0) + p--; + return p; +} + + // Management. + +int OSBDB::opendb(DBTYPE type, int flags) +{ + db = new Db(env, 0); + db->set_error_stream (&std::cerr); + db->set_message_stream (&std::cout); + db->set_flags (0); + if (!g_conf.bdbstore_btree) + { + if (g_conf.bdbstore_pagesize > 0) + db->set_pagesize (g_conf.bdbstore_pagesize); + if (g_conf.bdbstore_ffactor > 0 && g_conf.bdbstore_nelem > 0) + { + db->set_h_ffactor (g_conf.bdbstore_ffactor); + db->set_h_nelem (g_conf.bdbstore_nelem); + } + } + if (g_conf.bdbstore_cachesize > 0) + { + db->set_cachesize (0, g_conf.bdbstore_cachesize, 0); + } + + int ret; + if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0) + { + derr(1) << "failed to open database: " << device << ": " + << strerror(ret) << std::endl; + return -EINVAL; + } + opened = true; + return 0; +} + +int OSBDB::mount() +{ + dout(2) << "mount " << device << endl; + + if (mounted) + return 0; + + if (!opened) + { + int ret; + if ((ret = opendb ()) != 0) + return ret; + } + + // XXX Do we want anything else in the superblock? + + Dbt key (OSBDB_SUPERBLOCK_KEY, 1); + stored_superblock super; + Dbt value (&super, sizeof (super)); + value.set_dlen (sizeof (super)); + value.set_ulen (sizeof (super)); + value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); + + if (db->get (NULL, &key, &value, 0) != 0) + return -EINVAL; // XXX how to say "badly formed fs?" + + dout(2) << ".mount " << super << endl; + + if (super.version != OSBDB_THIS_VERSION) + return -EINVAL; + + DBTYPE t; + db->get_type (&t); + + if (t == DB_BTREE) + { + u_int32_t minkey; + u_int32_t flags; + db->get_bt_minkey (&minkey); + db->get_flags (&flags); + dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Btree; " + << "min keys per page: " << minkey << "; flags: " + << hex << flags << endl; + cout << dec; + } + else + { + u_int32_t ffactor; + u_int32_t nelem; + u_int32_t flags; + db->get_h_ffactor (&ffactor); + db->get_h_nelem (&nelem); + db->get_flags (&flags); + dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Hash; " + << "fill factor: " << ffactor + << " table size: " << nelem << "; flags: " + << hex << flags << endl; + cout << dec; + } + + mounted = true; + return 0; +} + +int OSBDB::umount() +{ + if (!mounted) + return -EINVAL; + sync(); + int ret; + if (opened) + { + if ((ret = db->close (0)) != 0) + { + derr(1) << "close: " << db_strerror(ret) << endl; + return -EINVAL; + } + delete db; + db = NULL; + } + mounted = false; + opened = false; + return 0; +} + +int OSBDB::mkfs() +{ + if (mounted) + return -EINVAL; + + dout(2) << "mkfs" << endl; + + unlink (device.c_str()); + int ret; + if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), DB_CREATE)) != 0) + { + derr(1) << "failed to open database: " << device << ": " + << strerror(ret) << std::endl; + return -EINVAL; + } + opened = true; + dout(3) << "..opened " << device << endl; + + uint32_t c; + ret = db->truncate (NULL, &c, 0); + if (ret != 0) + { + return -EIO; // ??? + } + + Dbt key (OSBDB_SUPERBLOCK_KEY, 1); + struct stored_superblock sb; + sb.version = OSBDB_THIS_VERSION; + Dbt value (&sb, sizeof (sb)); + + dout(3) << "..writing superblock" << endl; + if (db->put (NULL, &key, &value, 0) != 0) + { + return -EIO; // ??? + } + dout(3) << "..wrote superblock" << endl; + + return 0; +} + + // Objects. + +int OSBDB::pick_object_revision_lt(object_t& oid) +{ + if (!mounted) + return -EINVAL; + + // XXX this is pretty lame. Can we do better? + assert(oid.rev > 0); + oid.rev--; + while (oid.rev > 0) + { + if (exists (oid)) + { + return 0; + } + oid.rev--; + } + return -EEXIST; // FIXME +} + +bool OSBDB::exists(object_t oid) +{ + dout(2) << "exists " << oid << endl; + struct stat st; + return (stat (oid, &st) == 0); +} + +int OSBDB::statfs (struct statfs *st) +{ + return -ENOSYS; +} + +int OSBDB::stat(object_t oid, struct stat *st) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "stat " << oid << endl; + + object_inode_key ikey = new_object_inode_key(oid); + stored_object obj; + Dbt key (&ikey, sizeof_object_inode_key()); + Dbt value (&obj, sizeof (obj)); + value.set_flags (DB_DBT_USERMEM); + value.set_ulen (sizeof (obj)); + + dout(3) << " lookup " << ikey << endl; + int ret; + if ((ret = db->get (NULL, &key, &value, 0)) != 0) + { + derr(1) << " get returned " << ret << endl; + return -ENOENT; + } + + st->st_size = obj.length; + dout(3) << "stat length:" << obj.length << endl; + return 0; +} + +int OSBDB::remove(object_t oid, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "remove " << oid << endl; + + oid_t id; + mkoid(id, oid); + Dbt key (&id, sizeof (oid_t)); + db->del (NULL, &key, 0); + object_inode_key _ikey = new_object_inode_key (oid); + Dbt ikey (&_ikey, sizeof_object_inode_key()); + db->del (NULL, &ikey, 0); + + attrs_id aids = new_attrs_id (oid); + Dbt askey (&aids, sizeof_attrs_id()); + Dbt asval; + asval.set_flags (DB_DBT_MALLOC); + if (db->get (NULL, &askey, &asval, 0) == 0) + { + // We have attributes; remove them. + stored_attrs *sap = (stored_attrs *) asval.get_data(); + auto_ptr sa (sap); + for (unsigned i = 0; i < sap->count; i++) + { + attr_id aid = new_attr_id (oid, sap->names[i].name); + Dbt akey (&aid, sizeof (aid)); + db->del (NULL, &akey, 0); + } + db->del (NULL, &askey, 0); + } + + return 0; +} + +int OSBDB::truncate(object_t oid, off_t size, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "truncate " << size << endl; + + if (size > 0xFFFFFFFF) + return -ENOSPC; + + object_inode_key ikey = new_object_inode_key (oid); + stored_object obj; + Dbt key (&ikey, sizeof_object_inode_key()); + Dbt value (&obj, sizeof (obj)); + value.set_dlen (sizeof (obj)); + value.set_ulen (sizeof (obj)); + value.set_flags (DB_DBT_USERMEM); + + if (db->get (NULL, &key, &value, 0) != 0) + return -ENOENT; + + if (obj.length < size) + { + oid_t id; + mkoid (id, oid); + Dbt okey (&id, sizeof (oid_t)); + char b[] = { '\0' }; + Dbt newVal (b, 1); + newVal.set_doff ((size_t) size); + newVal.set_dlen (1); + newVal.set_ulen (1); + newVal.set_flags (DB_DBT_PARTIAL); + if (db->put (NULL, &okey, &newVal, 0) != 0) + return -EIO; + + obj.length = size; + value.set_ulen (sizeof (obj)); + if (db->put (NULL, &key, &value, 0) != 0) + return -EIO; + } + else if (obj.length > size) + { + obj.length = size; + Dbt tval (&obj, sizeof (obj)); + tval.set_ulen (sizeof (obj)); + tval.set_flags (DB_DBT_USERMEM); + if (db->put (NULL, &key, &tval, 0) != 0) + return -EIO; + if (size == 0) + { + char x[1]; + oid_t id; + mkoid (id, oid); + Dbt okey (&id, sizeof (oid_t)); + Dbt oval (&x, 0); + if (db->put (NULL, &okey, &oval, 0) != 0) + return -EIO; + } + else + { + oid_t id; + mkoid (id, oid); + Dbt okey (&id, sizeof (oid_t)); + Dbt oval; + oval.set_flags (DB_DBT_MALLOC); + if (db->get (NULL, &okey, &oval, 0) != 0) + return -EIO; + auto_ptr ovalPtr ((char *) oval.get_data()); + oval.set_size ((size_t) size); + oval.set_ulen ((size_t) size); + if (db->put (NULL, &okey, &oval, 0) != 0) + return -EIO; + } + } + + return 0; +} + +int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "read " << oid << " " << offset << " " + << len << endl; + + DbTxn *txn = NULL; + //env->txn_begin (NULL, &txn, 0); + + object_inode_key _ikey = new_object_inode_key (oid); + stored_object obj; + Dbt ikey (&_ikey, sizeof_object_inode_key()); + Dbt ival (&obj, sizeof (obj)); + ival.set_flags (DB_DBT_USERMEM); + ival.set_ulen (sizeof(obj)); + + dout(3) << " get " << _ikey << endl; + int ret; + if ((ret = db->get (txn, &ikey, &ival, 0)) != 0) + { + //txn->abort(); + derr(1) << "get returned " << db_strerror (ret) << endl; + return -ENOENT; + } + + if (offset == 0 && len >= obj.length) + { + len = obj.length; + dout(3) << " doing full read of " << len << endl; + oid_t id; + mkoid (id, oid); + Dbt key (&id, sizeof (oid_t)); + Dbt value (bl.c_str(), len); + value.set_ulen (len); + value.set_flags (DB_DBT_USERMEM); + dout(3) << " getting " << oid << endl; + if ((ret = db->get (txn, &key, &value, 0)) != 0) + { + derr(1) << " get returned " << db_strerror (ret) << endl; + //txn->abort(); + return -EIO; + } + } + else + { + if (offset > obj.length) + return 0; + if (offset + len > obj.length) + len = obj.length - (size_t) offset; + dout(3) << " doing partial read of " << len << endl; + oid_t id; + mkoid (id, oid); + Dbt key (&id, sizeof (oid)); + Dbt value (bl.c_str(), len); + value.set_doff ((size_t) offset); + value.set_dlen (len); + value.set_ulen (len); + value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); + dout(3) << " getting " << oid << endl; + if ((ret = db->get (NULL, &key, &value, 0)) != 0) + { + derr(1) << "get returned " << db_strerror (ret) << endl; + //txn->abort(); + return -EIO; + } + } + + //txn->commit (0); + return len; +} + +int OSBDB::write(object_t oid, off_t offset, size_t len, + bufferlist& bl, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "write " << oid << " " << offset << " " + << len << endl; + + if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL) + return -ENOSPC; + + DbTxn *txn = NULL; + //env->txn_begin (NULL, &txn, 0); + + object_inode_key _ikey = new_object_inode_key (oid); + stored_object obj; + Dbt ikey (&_ikey, sizeof_object_inode_key()); + Dbt ival (&obj, sizeof (obj)); + ival.set_ulen (sizeof (obj)); + ival.set_flags (DB_DBT_USERMEM); + + int ret; + dout(3) << " getting " << _ikey << endl; + if (db->get (txn, &ikey, &ival, 0) != 0) + { + dout(3) << " writing new object" << endl; + + // New object. + obj.length = (size_t) offset + len; + dout(3) << " mapping " << _ikey << " => " + << obj << endl; + if ((ret = db->put (txn, &ikey, &ival, 0)) != 0) + { + derr(1) << " put returned " << db_strerror (ret) << endl; + return -EIO; + } + + oid_t id; + mkoid (id, oid); + Dbt key (&id, sizeof (oid_t)); + Dbt value (bl.c_str(), len); + if (offset == 0) // whole object + { + value.set_flags (DB_DBT_USERMEM); + value.set_ulen (len); + } + else + { + value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); + value.set_ulen (len); + value.set_doff ((size_t) offset); + value.set_dlen (len); + } + dout(3) << " mapping " << oid << " => (" + << obj.length << " bytes)" << endl; + if ((ret = db->put (txn, &key, &value, 0)) != 0) + { + derr(1) << " put returned " << db_strerror (ret) << endl; + return -EIO; + } + return len; + } + + if (offset == 0 && len >= obj.length) + { + if (len != obj.length) + { + obj.length = len; + if ((ret = db->put (txn, &ikey, &ival, 0)) != 0) + { + derr(1) << " put returned " << db_strerror (ret) << endl; + return -EIO; + } + } + oid_t id; + mkoid(id, oid); + Dbt key (&id, sizeof (oid_t)); + Dbt value (bl.c_str(), len); + if (db->put (txn, &key, &value, 0) != 0) + { + return -EIO; + } + } + else + { + if (offset + len > obj.length) + { + obj.length = (size_t) offset + len; + if (db->put (NULL, &ikey, &ival, 0) != 0) + { + return -EIO; + } + } + oid_t id; + mkoid(id, oid); + Dbt key (&id, sizeof (oid_t)); + Dbt value (bl.c_str(), len); + value.set_doff ((size_t) offset); + value.set_dlen (len); + value.set_ulen (len); + value.set_flags (DB_DBT_PARTIAL); + if (db->put (NULL, &key, &value, 0) != 0) + { + return -EIO; + } + } + + return len; +} + +int OSBDB::clone(object_t oid, object_t noid) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "clone " << oid << ", " << noid << endl; + + if (exists (noid)) + return -EEXIST; + + object_inode_key _ikey = new_object_inode_key (oid); + object_inode_key _nikey = new_object_inode_key (noid); + stored_object obj; + Dbt ikey (&_ikey, sizeof_object_inode_key()); + Dbt ival (&obj, sizeof (obj)); + Dbt nikey (&_nikey, sizeof_object_inode_key()); + ival.set_ulen (sizeof (obj)); + ival.set_flags (DB_DBT_USERMEM); + + oid_t id, nid; + mkoid(id, oid); + mkoid(nid, noid); + Dbt key (&id, sizeof (oid_t)); + Dbt nkey (&oid, sizeof (oid_t)); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &ikey, &ival, 0) != 0) + return -ENOENT; + if (db->get (NULL, &key, &value, 0) != 0) + return -ENOENT; + auto_ptr valueptr ((char *) value.get_data()); + + if (db->put (NULL, &nikey, &ival, 0) != 0) + return -EIO; + if (db->put (NULL, &nkey, &value, 0) != 0) + return -EIO; + + return 0; +} + + // Collections + +int OSBDB::list_collections(list& ls) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "list_collections" << endl; + + Dbt key (COLLECTIONS_KEY, 1); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &key, &value, 0) != 0) + return 0; // no collections. + + auto_ptr sc ((stored_colls *) value.get_data()); + stored_colls *scp = sc.get(); + for (uint32_t i = 0; i < sc->count; i++) + ls.push_back (scp->colls[i]); + + return scp->count; +} + +int OSBDB::create_collection(coll_t c, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "create_collection " << c << endl; + + Dbt key (COLLECTIONS_KEY, 1); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + stored_colls *scp = NULL; + size_t sz = 0; + bool created = false; + if (db->get (NULL, &key, &value, 0) != 0) + { + sz = sizeof (stored_colls) + sizeof (coll_t); + scp = (stored_colls *) malloc (sz); + scp->count = 0; + created = true; + } + else + { + scp = (stored_colls *) value.get_data(); + sz = value.get_size(); + } + + auto_ptr sc (scp); + int ins = 0; + if (scp->count > 0) + ins = binary_search (scp->colls, scp->count, c); + if (scp->colls[ins] == c) + return -EEXIST; + + dout(3) << "..insertion point: " << ins << endl; + + // Make room for a new collection ID. + if (!created) + { + sz += sizeof (coll_t); + dout(3) << "..increase size to " << sz << endl; + stored_colls *scp2 = (stored_colls *) realloc (scp, sz); + sc.release (); + sc.reset (scp2); + scp = scp2; + } + + int n = (scp->count - ins) * sizeof (coll_t); + if (n > 0) + { + dout(3) << "..moving " << n << " bytes up" << endl; + memmove (&scp->colls[ins + 1], &scp->colls[ins], n); + } + scp->count++; + scp->colls[ins] = c; + + dout(3) << "..collections: " << scp << endl; + + // Put the modified collection list back. + { + Dbt value2 (scp, sz); + if (db->put (NULL, &key, &value2, 0) != 0) + { + return -EIO; + } + } + + // Create the new collection. + { + stored_coll new_coll; + new_coll.count = 0; + Dbt coll_key (&c, sizeof (coll_t)); + Dbt coll_value (&new_coll, sizeof (stored_coll)); + if (db->put (NULL, &coll_key, &coll_value, 0) != 0) + { + return -EIO; + } + } + + return 0; +} + +int OSBDB::destroy_collection(coll_t c, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "destroy_collection " << c << endl; + + Dbt key (COLLECTIONS_KEY, 1); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &key, &value, 0) != 0) + { + return -ENOENT; // XXX + } + + stored_colls *scp = (stored_colls *) value.get_data(); + auto_ptr valueBuf (scp); + if (scp->count == 0) + { + return -ENOENT; + } + uint32_t ins = binary_search (scp->colls, scp->count, c); + if (scp->colls[ins] != c) + { + return -ENOENT; + } + + // Move the rest of the list down in memory, if needed. + if (ins < scp->count - 1) + { + size_t n = scp->count - ins - 1; + memmove (&scp->colls[ins], &scp->colls[ins + 1], n); + } + + // Modify the record size to be one less. + Dbt nvalue (scp, value.get_size() - sizeof (coll_t)); + nvalue.set_flags (DB_DBT_USERMEM); + if (db->put (NULL, &key, &nvalue, 0) != 0) + { + return -EIO; + } + + // Delete the collection. + Dbt collKey (&c, sizeof (coll_t)); + if (db->del (NULL, &collKey, 0) != 0) + { + return -EIO; + } + + return 0; +} + +bool OSBDB::collection_exists(coll_t c) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_exists " << c << endl; + + Dbt key (COLLECTIONS_KEY, 1); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &key, &value, 0) != 0) + return false; + + stored_colls *scp = (stored_colls *) value.get_data(); + auto_ptr sc (scp); + if (scp->count == 0) + return false; + uint32_t ins = binary_search (scp->colls, scp->count, c); + + return (scp->colls[ins] == c); +} + +int OSBDB::collection_stat(coll_t c, struct stat *st) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_stat " << c << endl; + return -ENOSYS; +} + +int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_add " << c << " " << o << endl; + + Dbt key (&c, sizeof (coll_t)); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &key, &value, 0) != 0) + { + return -ENOENT; + } + + size_t sz = value.get_size(); + stored_coll *scp = (stored_coll *) value.get_data(); + auto_ptr sc (scp); + + // Find the insertion point for the new object ID. + uint32_t ins = 0; + if (scp->count > 0) + { + ins = binary_search (scp->objects, scp->count, o); + // Already there? + if (scp->objects[ins] == o) + { + return -EEXIST; + } + } + + // Make room for the new value, and add it. + sz += sizeof (object_t); + scp = (stored_coll *) realloc (scp, sz); + sc.release(); + sc.reset (scp); + if (ins < scp->count) + { + size_t n = (scp->count - ins) * sizeof (object_t); + memmove (&scp->objects[ins + 1], &scp->objects[ins], n); + } + scp->count++; + scp->objects[ins] = o; + + dout(3) << "..collection: " << scp << endl; + + Dbt nvalue (scp, sz); + if (db->put (NULL, &key, &nvalue, 0) != 0) + { + return -EIO; + } + + return 0; +} + +int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_remove " << c << " " << o << endl; + + Dbt key (&c, sizeof (coll_t)); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &key, &value, 0) != 0) + { + return -ENOENT; + } + + stored_coll *scp = (stored_coll *) value.get_data(); + auto_ptr sc (scp); + + if (scp->count == 0) + { + return -ENOENT; + } + uint32_t ins = binary_search (scp->objects, scp->count, o); + if (scp->objects[ins] != o) + { + return -ENOENT; + } + + if (ins < scp->count - 1) + { + size_t n = (scp->count - ins - 1) * sizeof (object_t); + memmove (&scp->objects[ins], &scp->objects[ins + 1], n); + } + scp->count--; + + dout(3) << "..collection " << scp << endl; + + Dbt nval (scp, value.get_size() - sizeof (object_t)); + if (db->put (NULL, &key, &nval, 0) != 0) + { + return -EIO; + } + + return 0; +} + +int OSBDB::collection_list(coll_t c, list& o) +{ + if (!mounted) + return -EINVAL; + + Dbt key (&c, sizeof (coll_t)); + Dbt value; + if (db->get (NULL, &key, &value, 0) != 0) + return -ENOENT; + + stored_coll *scp = (stored_coll *) value.get_data(); + auto_ptr sc (scp); + for (uint32_t i = 0; i < scp->count; i++) + o.push_back (scp->objects[i]); + + return 0; +} + + // Attributes + +int OSBDB::_setattr(object_t oid, const char *name, + const void *value, size_t size, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + if (strlen (name) >= OSBDB_MAX_ATTR_LEN) + return -ENAMETOOLONG; + + // Add name to attribute list, if needed. + attrs_id aids = new_attrs_id (oid); + Dbt attrs_key (&aids, sizeof_attrs_id()); + Dbt attrs_val; + attrs_val.set_flags (DB_DBT_MALLOC); + stored_attrs *sap = NULL; + size_t sz = 0; + + dout(3) << " getting " << aids << endl; + if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0) + { + dout(2) << " first attribute" << endl; + sz = sizeof (stored_attrs); + sap = (stored_attrs *) malloc(sz); + sap->count = 0; + } + else + { + sz = attrs_val.get_size(); + sap = (stored_attrs *) attrs_val.get_data(); + dout(2) << " add to list of " << sap->count << " attrs" << endl; + } + auto_ptr sa (sap); + + attr_name _name; + strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); + + int ins = 0; + if (sap->count > 0) + ins = binary_search (sap->names, sap->count, _name); + dout(3) << " insertion point is " << ins << endl; + if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0) + { + sz += sizeof (attr_name); + dout(3) << " realloc 0x" << hex << ((void *) sap) << " to " + << dec << sz << endl; + sap = (stored_attrs *) realloc (sap, sz); + dout(3) << " returns 0x" << hex << ((void *) sap) << endl; + sa.release (); + sa.reset (sap); + int n = (sap->count - ins) * sizeof (attr_name); + if (n > 0) + { + dout(3) << " move " << n << " bytes from 0x" + << hex << (&sap->names[ins]) << " to 0x" + << hex << (&sap->names[ins+1]) << endl; + memmove (&sap->names[ins+1], &sap->names[ins], n); + } + memset (&sap->names[ins], 0, sizeof (attr_name)); + strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN); + sap->count++; + + Dbt newAttrs_val (sap, sz); + newAttrs_val.set_ulen (sz); + newAttrs_val.set_flags (DB_DBT_USERMEM); + dout(3) << " putting " << aids << endl; + if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0) + return -EIO; + } + else + { + dout(3) << " attribute " << name << " already exists" << endl; + } + + dout(3) << " attributes list: " << sap << endl; + + // Add the attribute. + attr_id aid = new_attr_id (oid, name); + Dbt attr_key (&aid, sizeof (aid)); + Dbt attr_val ((void *) value, size); + dout(3) << " writing attribute key " << aid << endl; + if (db->put (NULL, &attr_key, &attr_val, 0) != 0) + return -EIO; + + return 0; +} + +int OSBDB::setattr(object_t oid, const char *name, + const void *value, size_t size, + Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "setattr " << oid << ":" << name << " => (" + << size << " bytes)" << endl; + int ret = _setattr (oid, name, value, size, onsafe); + return ret; +} + +int OSBDB::setattrs(object_t oid, map& aset, + Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + map::iterator it; + for (it = aset.begin(); it != aset.end(); it++) + { + string name = it->first; + bufferptr value = it->second; + int ret = _setattr (oid, name.c_str(), value.c_str(), + value.length(), onsafe); + if (ret != 0) + { + return ret; + } + } + return 0; +} + +int OSBDB::_getattr (object_t oid, const char *name, void *value, size_t size) +{ + if (!mounted) + return -EINVAL; + + attr_id aid = new_attr_id (oid, name); + Dbt key (&aid, sizeof (aid)); + Dbt val (value, size); + val.set_ulen (size); + val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); + + if (db->get (NULL, &key, &val, 0) != 0) + { + return -ENOENT; + } + + return val.get_size(); +} + +int OSBDB::getattr(object_t oid, const char *name, void *value, size_t size) +{ + if (!mounted) + return -EINVAL; + + return _getattr (oid, name, value, size); +} + +int OSBDB::getattrs(object_t oid, map& aset) +{ + if (!mounted) + return -EINVAL; + + int count = 0; + for (map::iterator it = aset.begin(); + it != aset.end(); it++) + { + int ret = _getattr (oid, (*it).first.c_str(), + (*it).second.c_str(), + (*it).second.length()); + if (ret < 0) + return ret; + count += ret; + } + return count; +} + +int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe) +{ + if (!mounted) + return -EINVAL; + attrs_id aids = new_attrs_id (oid); + Dbt askey (&aids, sizeof_attrs_id()); + Dbt asvalue; + asvalue.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &askey, &asvalue, 0) != 0) + return -ENOENT; + + stored_attrs *sap = (stored_attrs *) asvalue.get_data(); + auto_ptr sa (sap); + + if (sap->count == 0) + return -ENOENT; + + attr_name _name; + memset(&name, 0, sizeof (_name)); + strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); + int ins = binary_search (sap->names, sap->count, _name); + if (strcmp (sap->names[ins].name, name) != 0) + return -ENOENT; + + // Shift the later elements down by one, if needed. + int n = (sap->count - ins) * sizeof (attr_name); + if (n > 0) + memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n); + sap->count--; + asvalue.set_size(asvalue.get_size() - sizeof (attr_name)); + int ret; + if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0) + { + derr(1) << "put stored_attrs " << db_strerror (ret) << endl; + return -EIO; + } + + // Remove the attribute. + attr_id aid = new_attr_id (oid, name); + Dbt key (&aid, sizeof (aid)); + if ((ret = db->del (NULL, &key, 0)) != 0) + derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl; + + return 0; +} + +int OSBDB::listattr(object_t oid, char *attrs, size_t size) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "listattr " << oid << endl; + + attrs_id aids = new_attrs_id (oid); + Dbt key (&aids, sizeof_attrs_id()); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + int ret; + if ((ret = db->get (NULL, &key, &value, 0)) != 0) + { + derr(1) << "fetching " << aids << ": " << db_strerror (ret) + << endl; + return -ENOENT; + } + + stored_attrs *attrsp = (stored_attrs *) value.get_data(); + auto_ptr _attrs (attrsp); + size_t s = 0; + char *p = attrs; + for (unsigned i = 0; i < attrsp->count && s < size; i++) + { + int n = MIN (OSBDB_MAX_ATTR_LEN, + MIN (strlen (attrsp->names[i].name), size - s - 1)); + strncpy (p, attrsp->names[i].name, n); + p[n] = '\0'; + p = p + n + 1; + } + return 0; +} + + // Collection attributes. + +int OSBDB::collection_setattr(coll_t cid, const char *name, + const void *value, size_t size, + Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_setattr" << cid << " " << name + << " (" << size << " bytes)" << endl; + if (strlen (name) >= OSBDB_MAX_ATTR_LEN) + return -ENAMETOOLONG; + + // Add name to attribute list, if needed. + coll_attrs_id aids = new_coll_attrs_id (cid); + Dbt attrs_key (&aids, sizeof_coll_attrs_id()); + Dbt attrs_val; + attrs_val.set_flags (DB_DBT_MALLOC); + stored_attrs *sap = NULL; + size_t sz = 0; + + dout(3) << " getting " << aids << endl; + if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0) + { + dout(2) << " first attribute" << endl; + sz = sizeof (stored_attrs); + sap = (stored_attrs *) malloc(sz); + sap->count = 0; + } + else + { + sz = attrs_val.get_size(); + sap = (stored_attrs *) attrs_val.get_data(); + dout(2) << " add to list of " << sap->count << " attrs" << endl; + } + auto_ptr sa (sap); + + attr_name _name; + strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); + + int ins = 0; + if (sap->count > 0) + ins = binary_search (sap->names, sap->count, _name); + dout(3) << " insertion point is " << ins << endl; + if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0) + { + sz += sizeof (attr_name); + dout(3) << " realloc 0x" << hex << ((void *) sap) << " to " + << dec << sz << endl; + sap = (stored_attrs *) realloc (sap, sz); + dout(3) << " returns 0x" << hex << ((void *) sap) << endl; + sa.release (); + sa.reset (sap); + int n = (sap->count - ins) * sizeof (attr_name); + if (n > 0) + { + dout(3) << " move " << n << " bytes from 0x" + << hex << (&sap->names[ins]) << " to 0x" + << hex << (&sap->names[ins+1]) << endl; + memmove (&sap->names[ins+1], &sap->names[ins], n); + } + memset (&sap->names[ins], 0, sizeof (attr_name)); + strncpy (sap->names[ins].name, name, OSBDB_MAX_ATTR_LEN); + sap->count++; + + Dbt newAttrs_val (sap, sz); + newAttrs_val.set_ulen (sz); + newAttrs_val.set_flags (DB_DBT_USERMEM); + dout(3) << " putting " << aids << endl; + if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0) + return -EIO; + } + else + { + dout(3) << " attribute " << name << " already exists" << endl; + } + + dout(3) << " attributes list: " << sap << endl; + + // Add the attribute. + coll_attr_id aid = new_coll_attr_id (cid, name); + Dbt attr_key (&aid, sizeof (aid)); + Dbt attr_val ((void *) value, size); + dout(3) << " writing attribute key " << aid << endl; + if (db->put (NULL, &attr_key, &attr_val, 0) != 0) + return -EIO; + + return 0; +} + +int OSBDB::collection_rmattr(coll_t cid, const char *name, + Context *onsafe) +{ + if (!mounted) + return -EINVAL; + + coll_attrs_id aids = new_coll_attrs_id (cid); + Dbt askey (&aids, sizeof_coll_attrs_id()); + Dbt asvalue; + asvalue.set_flags (DB_DBT_MALLOC); + + if (db->get (NULL, &askey, &asvalue, 0) != 0) + return -ENOENT; + + stored_attrs *sap = (stored_attrs *) asvalue.get_data(); + auto_ptr sa (sap); + + if (sap->count == 0) + return -ENOENT; + + attr_name _name; + memset(&name, 0, sizeof (_name)); + strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN); + int ins = binary_search (sap->names, sap->count, _name); + if (strcmp (sap->names[ins].name, name) != 0) + return -ENOENT; + + // Shift the later elements down by one, if needed. + int n = (sap->count - ins) * sizeof (attr_name); + if (n > 0) + memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n); + sap->count--; + asvalue.set_size(asvalue.get_size() - sizeof (attr_name)); + int ret; + if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0) + { + derr(1) << "put stored_attrs " << db_strerror (ret) << endl; + return -EIO; + } + + // Remove the attribute. + coll_attr_id aid = new_coll_attr_id (cid, name); + Dbt key (&aid, sizeof (aid)); + if ((ret = db->del (NULL, &key, 0)) != 0) + derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl; + + return 0; +} + +int OSBDB::collection_getattr(coll_t cid, const char *name, + void *value, size_t size) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_getattr " << cid << " " << name << endl; + + coll_attr_id caid = new_coll_attr_id (cid, name); + Dbt key (&caid, sizeof (caid)); + Dbt val (value, size); + val.set_ulen (size); + val.set_dlen (size); + val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL); + + if (db->get (NULL, &key, &val, 0) != 0) + return -ENOENT; + + return val.get_size(); +} + +int OSBDB::collection_listattr(coll_t cid, char *attrs, size_t size) +{ + if (!mounted) + return -EINVAL; + + dout(2) << "collection_listattr " << cid << endl; + + coll_attrs_id caids = new_coll_attrs_id (cid); + Dbt key (&caids, sizeof_coll_attrs_id()); + Dbt value; + value.set_flags (DB_DBT_MALLOC); + + int ret; + if ((ret = db->get (NULL, &key, &value, 0)) != 0) + { + derr(1) << "fetching " << caids << ": " << db_strerror (ret) + << endl; + return -ENOENT; + } + + stored_attrs *attrsp = (stored_attrs *) value.get_data(); + auto_ptr _attrs (attrsp); + size_t s = 0; + char *p = attrs; + for (unsigned i = 0; i < attrsp->count && s < size; i++) + { + int n = MIN (OSBDB_MAX_ATTR_LEN, + MIN (strlen (attrsp->names[i].name), size - s - 1)); + strncpy (p, attrsp->names[i].name, n); + p[n] = '\0'; + p = p + n + 1; + } + return 0; +} + + // Sync. + +void OSBDB::sync (Context *onsync) +{ + if (!mounted) + return; + + sync(); + // huh? +} + +void OSBDB::sync() +{ + if (!mounted) + return; + + db->sync(0); +} diff --git a/branches/riccardo/monitor2/osbdb/OSBDB.h b/branches/riccardo/monitor2/osbdb/OSBDB.h new file mode 100644 index 00000000000..9ba42d206d2 --- /dev/null +++ b/branches/riccardo/monitor2/osbdb/OSBDB.h @@ -0,0 +1,507 @@ +/* OSBDB.h -- ObjectStore on Berkeley DB. -*- c++ -*- + Copyright (C) 2007 Casey Marshall + +Ceph - scalable distributed file system + +This is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License version 2.1, as published by the Free Software +Foundation. See file COPYING. */ + + +#include +#include "osd/ObjectStore.h" + +// Redefine this to use a different BDB access type. DB_BTREE is +// probably the only other one that makes sense. +#ifndef OSBDB_DB_TYPE +#define OSBDB_DB_TYPE DB_HASH +#endif // OSBDB_DB_TYPE + +/* + * Maximum length of an attribute name. + */ +#define OSBDB_MAX_ATTR_LEN 256 + +#define OSBDB_THIS_VERSION 1 + +#define OSBDB_SUPERBLOCK_KEY ((void *) "s") + +/* + * The "superblock" of the BDB object store. We store one of these in + * the DB, to store version and other information. We don't record + * anything special here, just the version number the database was + * written with. + * + * In principle, this structure is variable-length, depending on the + * software version writing the superblock. + */ +struct stored_superblock +{ + uint32_t version; +}; + +inline ostream& operator<<(ostream& out, const stored_superblock sb) +{ + out << "osbdb.super(" << sb.version << ")" << endl; + return out; +} + +/** + * An object identifier; we define this so we can have a POD object to + * work with. + */ +struct oid_t // POD +{ + char id[16]; +}; + +inline void mkoid (oid_t& id, object_t& oid) +{ + // XXX byte order? + memcpy (id.id, &oid, sizeof (oid_t)); +} + +inline ostream& operator<<(ostream& out, const oid_t id) +{ + for (int i = 0; i < 16; i++) + { + out.fill('0'); + out << setw(2) << hex << (id.id[i] & 0xFF); + if ((i & 3) == 3) + out << ':'; + } + out.unsetf(ios::right); + out << dec; + return out; +} + +/** + * An "inode" key. We map a 'stored_object' struct to this key for + * every object. + */ +struct object_inode_key // POD +{ + oid_t oid; + char tag; +}; + +/** + * "Constructor" for an object_inode_key. + */ +inline object_inode_key new_object_inode_key (object_t& oid) +{ + object_inode_key key; + memset(&key, 0, sizeof (object_inode_key)); + mkoid (key.oid, oid); + key.tag = 'i'; + return key; +} + +/* + * We use this, instead of sizeof(), to try and guarantee that we + * don't include the structure padding, if any. + * + * This *should* return 17: sizeof (oid_t) == 16; sizeof (char) == 1. + */ +inline size_t sizeof_object_inode_key() +{ + return offsetof(object_inode_key, tag) + sizeof (char); +} + + // Frank Poole: Unfortunately, that sounds a little + // like famous last words. + // -- 2001: A Space Odyssey + +inline ostream& operator<<(ostream& out, const object_inode_key o) +{ + out << o.tag << "/" << o.oid; + return out; +} + +/** + * A stored object. This is essentially the "inode" of the object, + * containing things like the object's length. The object itself is + * stored as-is, mapped by the 128-bit object ID. + */ +struct stored_object +{ + uint32_t length; +}; + +inline ostream& operator<<(ostream& out, const stored_object s) +{ + out << "inode(l:" << s.length << ")"; + return out; +} + +/* + * Key referencing the list of attribute names for an object. This is + * simply the object's ID, with an additional character 'a' appended. + */ +struct attrs_id // POD +{ + oid_t oid; + char tag; +}; + +/* + * "Construtor" for attrs_id. + */ +inline struct attrs_id new_attrs_id (object_t& oid) +{ + attrs_id aid; + memset (&aid, 0, sizeof (attrs_id)); + mkoid(aid.oid, oid); + aid.tag = 'a'; + return aid; +} + +/* + * See explanation for sizeof_object_inode_id. + */ +inline size_t sizeof_attrs_id() +{ + return offsetof(struct attrs_id, tag) + sizeof (char); +} + +inline ostream& operator<<(ostream& out, const attrs_id id) +{ + out << id.tag << "/" << id.oid; + return out; +} + +/* + * Encapsulation of a single attribute name. + */ +struct attr_name // POD +{ + char name[OSBDB_MAX_ATTR_LEN]; +}; + +inline ostream& operator<<(ostream& out, const attr_name n) +{ + out << n.name; + return out; +} + +inline bool operator<(const attr_name n1, const attr_name n2) +{ + return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) < 0); +} + +inline bool operator>(const attr_name n1, const attr_name n2) +{ + return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) > 0); +} + +inline bool operator==(const attr_name n1, const attr_name n2) +{ + std::cerr << n1.name << " == " << n2.name << "?" << endl; + return (strncmp (n1.name, n2.name, OSBDB_MAX_ATTR_LEN) == 0); +} + +inline bool operator!=(const attr_name n1, const attr_name n2) +{ + return !(n1 == n2); +} + +inline bool operator>=(const attr_name n1, const attr_name n2) +{ + return !(n1 < n2); +} + +inline bool operator<=(const attr_name n1, const attr_name n2) +{ + return !(n1 > n2); +} + +/* + * A list of an object or collection's attribute names. + */ +struct stored_attrs +{ + uint32_t count; + attr_name names[0]; // actually variable-length +}; + +inline ostream& operator<<(ostream& out, const stored_attrs *sa) +{ + out << sa->count << " [ "; + for (unsigned i = 0; i < sa->count; i++) + out << sa->names[i] << (i == sa->count - 1 ? " " : ", "); + out << "]"; + return out; +} + +/* + * An object attribute key. An object attribute is mapped simply by + * the object ID appended with the attribute name. Attribute names + * may not be empty, and must be less than 256 characters, in this + * implementation. + */ +struct attr_id // POD +{ + oid_t oid; + attr_name name; +}; + +inline attr_id new_attr_id (object_t& oid, const char *name) +{ + attr_id aid; + memset(&aid, 0, sizeof (attr_id)); + mkoid (aid.oid, oid); + strncpy (aid.name.name, name, OSBDB_MAX_ATTR_LEN); + return aid; +} + +inline ostream& operator<<(ostream &out, const attr_id id) +{ + out << id.oid << ":" << id.name; + return out; +} + +/* + * A key for a collection attributes list. + */ +struct coll_attrs_id // POD +{ + coll_t cid; + char tag; +}; + +inline coll_attrs_id new_coll_attrs_id (coll_t cid) +{ + coll_attrs_id catts; + memset(&catts, 0, sizeof (coll_attrs_id)); + catts.cid = cid; + catts.tag = 'C'; + return catts; +} + +inline size_t sizeof_coll_attrs_id() +{ + return offsetof(coll_attrs_id, tag) + sizeof (char); +} + +inline ostream& operator<<(ostream& out, coll_attrs_id id) +{ + out << id.tag << "/" << id.cid; + return out; +} + +/* + * A collection attribute key. Similar to + */ +struct coll_attr_id // POD +{ + coll_t cid; + attr_name name; +}; + +inline coll_attr_id new_coll_attr_id (coll_t cid, const char *name) +{ + coll_attr_id catt; + memset(&catt, 0, sizeof (coll_attr_id)); + catt.cid = cid; + strncpy (catt.name.name, name, OSBDB_MAX_ATTR_LEN); + return catt; +} + +inline ostream& operator<<(ostream& out, coll_attr_id id) +{ + out << id.cid << ":" << id.name; + return out; +} + +/* + * This is the key we store the master collections list under. + */ +#define COLLECTIONS_KEY ((void *) "c") + +/* + * The master list of collections. There should be one of these per + * OSD. The sole reason for this structure is to have the ability + * to enumerate all collections stored on this OSD. + */ +struct stored_colls +{ + // The number of collections. + uint32_t count; + + // The collection identifiers. This is a sorted list of coll_t + // values. + coll_t colls[0]; // actually variable-length +}; + +inline ostream& operator<<(ostream& out, stored_colls *c) +{ + out << c->count << " [ "; + for (unsigned i = 0; i < c->count; i++) + { + out << hex << c->colls[i]; + if (i < c->count - 1) + out << ", "; + } + out << " ]" << dec; + return out; +} + +/* + * A stored collection (a bag of object IDs). These are referenced by + * the bare collection identifier type, a coll_t (thus, a 32-bit + * integer). Internally this is stored as a sorted list of object IDs. + * + * Note, this structure places all collection items in a single + * record; this may be a memory burden for large collections. + */ +struct stored_coll +{ + // The size of this collection. + uint32_t count; + + // The object IDs in this collection. This is a sorted list of all + // object ID's in this collection. + object_t objects[0]; // actually variable-length +}; + +inline ostream& operator<<(ostream& out, stored_coll *c) +{ + out << c->count << " [ "; + for (unsigned i = 0; i < c->count; i++) + { + out << c->objects[i]; + if (i < c->count - 1) + out << ", "; + } + out << " ]"; + return out; +} + +/* + * The object store interface for Berkeley DB. + */ +class OSBDB : public ObjectStore +{ + private: + DbEnv *env; + Db *db; + string device; + bool mounted; + bool opened; + + public: + + OSBDB(const char *dev) + : env(0), db (0), device (dev), mounted(false), opened(false) + { + /*env = new DbEnv (DB_CXX_NO_EXCEPTIONS); + env->set_error_stream (&std::cerr); + // WTF? You can't open an env if you set this flag here, but BDB + // says you also can't set it after you open the env. + //env->set_flags (DB_LOG_INMEMORY, 1); + char *p = strrchr (dev, '/'); + int env_flags = (DB_CREATE | DB_THREAD | DB_INIT_LOCK + | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG); + if (p != NULL) + { + *p = '\0'; + if (env->open (dev, env_flags, 0) != 0) + { + std::cerr << "failed to open environment: " + << dev << std::endl; + ::abort(); + } + *p = '/'; + dev = p+1; + } + else + { + if (env->open (NULL, env_flags, 0) != 0) + { + std::cerr << "failed to open environment: ." << std::endl; + ::abort(); + } + } + + // Double WTF: if you remove the DB_LOG_INMEMORY bit, db->open + // fails, inexplicably, with EINVAL!*/ + // env->set_flags (DB_DIRECT_DB | /*DB_AUTO_COMMIT |*/ DB_LOG_INMEMORY, 1); + } + + ~OSBDB() + { + if (mounted) + { + umount(); + } + if (env != NULL) + { + env->close (0); + delete env; + } + } + + int mount(); + int umount(); + int mkfs(); + + int statfs(struct statfs *buf); + + int pick_object_revision_lt(object_t& oid); + + bool exists(object_t oid); + int stat(object_t oid, struct stat *st); + + int remove(object_t oid, Context *onsafe=0); + + int truncate(object_t oid, off_t size, Context *onsafe=0); + + int read(object_t oid, off_t offset, size_t len, + bufferlist& bl); + int write(object_t oid, off_t offset, size_t len, + bufferlist& bl, Context *onsafe); + + int setattr(object_t oid, const char *name, + const void *value, size_t size, Context *onsafe=0); + int setattrs(object_t oid, map& aset, + Context *onsafe=0); + int getattr(object_t oid, const char *name, + void *value, size_t size); + int getattrs(object_t oid, map& aset); + int rmattr(object_t oid, const char *name, + Context *onsafe=0); + int listattr(object_t oid, char *attrs, size_t size); + + int clone(object_t oid, object_t noid); + + // Collections. + + int list_collections(list& ls); + int create_collection(coll_t c, Context *onsafe=0); + int destroy_collection(coll_t c, Context *onsafe=0); + bool collection_exists(coll_t c); + int collection_stat(coll_t c, struct stat *st); + int collection_add(coll_t c, object_t o, Context *onsafe=0); + int collection_remove(coll_t c, object_t o, Context *onsafe=0); + int collection_list(coll_t c, list& o); + + int collection_setattr(coll_t cid, const char *name, + const void *value, size_t size, + Context *onsafe=0); + int collection_rmattr(coll_t cid, const char *name, + Context *onsafe=0); + int collection_getattr(coll_t cid, const char *name, + void *value, size_t size); + int collection_listattr(coll_t cid, char *attrs, size_t size); + + void sync(Context *onsync); + void sync(); + +private: + int opendb (DBTYPE type=DB_UNKNOWN, int flags=0); + + int _setattr(object_t oid, const char *name, const void *value, + size_t size, Context *onsync); + int _getattr(object_t oid, const char *name, void *value, size_t size); +}; diff --git a/branches/riccardo/monitor2/osd/OSD.cc b/branches/riccardo/monitor2/osd/OSD.cc index e2b1c1e4fdb..7e3bcc5568e 100644 --- a/branches/riccardo/monitor2/osd/OSD.cc +++ b/branches/riccardo/monitor2/osd/OSD.cc @@ -26,6 +26,10 @@ #include "ebofs/Ebofs.h" +#ifdef USE_OSBDB +#include "osbdb/OSBDB.h" +#endif // USE_OSBDB + #include "Ager.h" @@ -157,6 +161,11 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock) store = new OBFSStore(whoami, NULL, dev_path); } #endif +#ifdef USE_OSBDB + else if (g_conf.bdbstore) { + store = new OSBDB(dev_path); + } +#endif // USE_OSBDB else { store = new FakeStore(osd_base_path, whoami); } diff --git a/branches/riccardo/monitor2/test/testos.cc b/branches/riccardo/monitor2/test/testos.cc new file mode 100644 index 00000000000..0296f05a493 --- /dev/null +++ b/branches/riccardo/monitor2/test/testos.cc @@ -0,0 +1,308 @@ +/* testos.cc -- simple ObjectStore test harness. + Copyright (C) 2007 Casey Marshall + +Ceph - scalable distributed file system + +This is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License version 2.1, as published by the Free Software +Foundation. See file COPYING. */ + + +#include "osd/ObjectStore.h" +#include "ebofs/Ebofs.h" +#include "osbdb/OSBDB.h" +#include "include/buffer.h" + +#include +#include + +#include +#include + +using namespace std; + +static inline unsigned long long +to_usec (struct timeval &time) +{ + return (((unsigned long long) time.tv_sec * 1000000) + + ((unsigned long long) time.tv_usec)); +} + +static inline unsigned long long +to_msec (struct timeval &time) +{ + return (((unsigned long long) time.tv_sec * 1000) + + ((unsigned long long) time.tv_usec / 1000)); +} + +int main (int argc, char **argv) +{ + char *osd_name = "ebofs"; + unsigned object_size = 1024; + unsigned object_count = 1024; + unsigned write_iter = 64; + unsigned random_seed = ::time(NULL); + char *device = "/tmp/testos"; + char *mountcmd = "mount /tmp/testos"; + char *umountcmd = "umount /tmp/testos"; + + bool inhibit_remount = (getenv("TESTOS_INHIBIT_REMOUNT") != NULL); + + if (argc > 1 + && (strcmp (argv[1], "-h") == 0 + || strcmp (argv[1], "-help") == 0 + || strcmp (argv[1], "--help") == 0 + || argc > 6)) + { + cout << "usage: " << argv[0] << " [store [object-size [object-count [iterations [seed]]]]]" << endl; + cout << endl; + cout << "Where the arguments are:" << endl << endl; + cout << " store -- store type; default \"ebofs\"" << endl; + cout << " object-size -- size of objects; default 1024" << endl; + cout << " object-count -- number of objects to write; default 1024" + << endl; + cout << " iterations -- write the objects that many times; default 5" + << endl; + cout << " seed -- random seed; default current time" << endl; + exit (0); + } + + if (argc > 1) + osd_name = argv[1]; + if (argc > 2) + object_size = (unsigned) atol (argv[2]); + if (argc > 3) + object_count = (unsigned) atol (argv[3]); + if (argc > 4) + write_iter = (unsigned) atol (argv[4]); + if (argc > 5) + random_seed = (unsigned) atol (argv[5]); + + // algin object size to 'long' + object_size = ((object_size + (sizeof (long) - 1)) / sizeof (long)) * sizeof (long); + + char *osd_file = new char[32]; + strcpy (osd_file, "/tmp/testos/testos.XXXXXX"); + mktemp (osd_file); + + if (!inhibit_remount) + { + if (system (mountcmd) != 0) + { + cerr << "mount failed" << endl; + exit (1); + } + } + + ObjectStore *os = NULL; + if (strcasecmp (osd_name, "ebofs") == 0) + { + FILE *f = fopen (osd_file, "w"); + if (f == NULL) + { + cerr << "failed to open " << osd_file << ": " << strerror (errno) + << endl; + exit (1); + } + // 1G file. + fseek (f, 1024 * 1024 * 1024, SEEK_SET); + fputc ('\0', f); + fclose (f); + // 20K cache + g_conf.ebofs_bc_size = 5; // times 4K + os = new Ebofs (osd_file); + } + else if (strcasecmp (osd_name, "osbdb") == 0) + { + char *e = getenv ("OSBDB_FFACTOR"); + if (e != NULL) + g_conf.bdbstore_ffactor = atol(e); + e = getenv ("OSBDB_NELEM"); + if (e != NULL) + g_conf.bdbstore_nelem = atol(e); + e = getenv ("OSBDB_PAGESIZE"); + if (e != NULL) + g_conf.bdbstore_pagesize = atol(e); + g_conf.debug_bdbstore = 1; + // 20K cache + g_conf.bdbstore_cachesize = 20 * 1024; + os = new OSBDB (osd_file); + } + else if (strcasecmp (osd_name, "osbdb-btree") == 0) + { + g_conf.bdbstore_btree = true; + // 20K cache + g_conf.bdbstore_cachesize = 20 * 1024; + os = new OSBDB (osd_file); + } + else + { + cerr << "I don't know about object store \"" << osd_name << "\"" + << endl; + exit (1); + } + + cout << "Writing " << object_count << " objects of size " + << object_size << " to " << osd_name << endl; + + char *val = (char *) malloc (object_size); + char *val2 = (char *) malloc (object_size); + auto_ptr valptr (val); + auto_ptr valptr2(val2); + if (getenv ("TESTOS_UNALIGNED") != NULL) + { + val = val + 1; + val2 = val2 + 1; + } + + for (unsigned i = 0; i < object_size; i++) + { + val[i] = (char) i; + val2[i] = (char) i; + } + object_t *oids = new object_t[object_count]; + + utime_t writes[write_iter]; + utime_t total_write; + utime_t reads[write_iter]; + utime_t total_read; + for (unsigned i = 0; i < write_iter; i++) + { + cerr << "Iteration " << i << endl; + + int ret = os->mkfs(); + if (ret != 0) + { + cerr << "mkfs(" << osd_file << "): " << strerror (-ret) << endl; + exit (1); + } + ret = os->mount(); + if (ret != 0) + { + cerr << "mount(): " << strerror (-ret) << endl; + exit (1); + } + + srandom (random_seed + i); + + for (unsigned j = 0; j < object_count; j++) + { + oids[j].ino = (uint64_t) random() << 32 | random(); + oids[j].bno = random(); + } + + utime_t begin = g_clock.now(); + for (unsigned o = 0; o < object_count; o++) + { + bufferptr bp (val, object_size); + bufferlist bl; + bl.push_back (bp); + int ret; + if ((ret = os->write (oids[o], 0L, object_size, bl, NULL)) < 0) + cerr << "write " << oids[o] << " failed: " + << strerror (-ret) << endl; + } + utime_t end = g_clock.now() - begin; + + cerr << "Write finished in " << end << endl; + total_write += end; + writes[i] = end; + + os->sync(); + os->umount(); + sync(); + + if (!inhibit_remount) + { + if (system (umountcmd) != 0) + { + cerr << "umount failed" << endl; + exit (1); + } + + if (system (mountcmd) != 0) + { + cerr << "mount(2) failed" << endl; + exit (1); + } + } + + os->mount(); + + begin = g_clock.now(); + for (unsigned o = 0; o < object_count; o++) + { + bufferptr bp (val2, object_size); + bufferlist bl; + bl.push_back (bp); + + if (os->read (oids[o], 0L, object_size, bl) < 0) + { + cerr << "object " << oids[o] << " not found!" << endl; + } + } + end = g_clock.now() - begin; + + cerr << "Read finished in " << end << endl; + total_read += end; + reads[i] = end; + + os->umount(); + sync(); + + if (!inhibit_remount) + { + if (system (umountcmd) != 0) + { + cerr << "umount(2) failed" << endl; + exit (1); + } + + if (system (mountcmd) != 0) + { + cerr << "mount(3) failed" << endl; + exit (1); + } + } + } + + cerr << "Finished in " << (total_write + total_read) << endl; + + double write_mean = (double) total_write / write_iter; + double write_sd = 0.0; + for (unsigned i = 0; i < write_iter; i++) + { + double x = (double) writes[i] - write_mean; + write_sd += x * x; + } + write_sd = sqrt (write_sd / write_iter); + + double read_mean = (double) total_read / write_iter; + double read_sd = 0.0; + for (unsigned i = 0; i < write_iter; i++) + { + double x = (double) reads[i] - read_mean; + write_sd += x * x; + } + read_sd = sqrt (read_sd / write_iter); + + cout << "TESTOS: write " << osd_name << ":" << object_size << ":" + << object_count << ":" << write_iter << ":" << random_seed + << " -- " << write_mean << " " << write_sd << endl; + + cout << "TESTOS: read " << osd_name << ":" << object_size << ":" + << object_count << ":" << write_iter << ":" << random_seed + << " -- " << read_mean << " " << read_sd << endl; + + unlink (osd_file); + if (!inhibit_remount) + { + if (system (umountcmd) != 0) + { + cerr << "umount(3) failed" << endl; + exit (1); + } + } + exit (0); +}