ceph/src/streamtest.cc
Colin Patrick McCabe 13aed89e48 Add CephContext
A CephContext represents the context held by a single library user.
There can be multiple CephContexts in the same process.

For daemons and utility programs, there will be only one CephContext.
The CephContext contains the configuration, the dout object, and
anything else that you might want to pass to libcommon with every
function call.

Move some non-config things out of md_config_t and into CephContext.

Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
2011-05-23 17:28:27 -07:00

170 lines
3.6 KiB
C++

// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <iostream>
#include "os/FileStore.h"
#include "common/common_init.h"
#include "common/ceph_argparse.h"
#undef dout_prefix
#define dout_prefix *_dout
struct io {
utime_t start, ack, commit;
bool done() {
return ack.sec() && commit.sec();
}
};
map<off_t,io> writes;
Cond cond;
Mutex lock("streamtest.cc lock");
unsigned concurrent = 1;
void throttle()
{
Mutex::Locker l(lock);
while (writes.size() >= concurrent) {
//generic_dout(0) << "waiting" << dendl;
cond.Wait(lock);
}
}
void pr(off_t off)
{
io &i = writes[off];
generic_dout(0) << off << "\t"
<< (i.ack - i.start) << "\t"
<< (i.commit - i.start) << dendl;
writes.erase(off);
cond.Signal();
}
void set_start(off_t off, utime_t t)
{
Mutex::Locker l(lock);
writes[off].start = t;
}
void set_ack(off_t off, utime_t t)
{
Mutex::Locker l(lock);
//generic_dout(0) << "ack " << off << dendl;
writes[off].ack = t;
if (writes[off].done())
pr(off);
}
void set_commit(off_t off, utime_t t)
{
Mutex::Locker l(lock);
//generic_dout(0) << "commit " << off << dendl;
writes[off].commit = t;
if (writes[off].done())
pr(off);
}
struct C_Ack : public Context {
off_t off;
C_Ack(off_t o) : off(o) {}
void finish(int r) {
set_ack(off, g_clock.now());
}
};
struct C_Commit : public Context {
off_t off;
C_Commit(off_t o) : off(o) {}
void finish(int r) {
set_commit(off, g_clock.now());
}
};
int main(int argc, const char **argv)
{
vector<const char*> args;
argv_to_vec(argc, argv, args);
env_to_vec(args);
common_init(args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(&g_ceph_context);
// args
if (args.size() < 3) return -1;
const char *filename = args[0];
int seconds = atoi(args[1]);
int bytes = atoi(args[2]);
const char *journal = 0;
if (args.size() >= 4)
journal = args[3];
if (args.size() >= 5)
concurrent = atoi(args[4]);
cout << "concurrent = " << concurrent << std::endl;
buffer::ptr bp(bytes);
bp.zero();
bufferlist bl;
bl.push_back(bp);
//float interval = 1.0 / 1000;
cout << "#dev " << filename
<< ", " << seconds << " seconds, " << bytes << " bytes per write" << std::endl;
ObjectStore *fs = new FileStore(filename, journal);
if (fs->mount() < 0) {
cout << "mount failed" << std::endl;
return -1;
}
ObjectStore::Transaction ft;
ft.create_collection(coll_t());
fs->apply_transaction(ft);
utime_t now = g_clock.now();
utime_t end = now;
end += seconds;
off_t pos = 0;
//cout << "stop at " << end << std::endl;
cout << "# offset\tack\tcommit" << std::endl;
while (now < end) {
sobject_t poid(object_t("streamtest"), 0);
set_start(pos, g_clock.now());
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->write(coll_t(), poid, pos, bytes, bl);
fs->queue_transaction(NULL, t, new C_Ack(pos), new C_Commit(pos));
pos += bytes;
throttle();
// wait?
/*
utime_t next = start;
next += interval;
if (now < next) {
float s = next - now;
s *= 1000 * 1000; // s -> us
//cout << "sleeping for " << s << " us" << std::endl;
usleep((int)s);
}
*/
}
fs->umount();
}