ceph/src/streamtest.cc

171 lines
3.7 KiB
C++
Raw Normal View History

2008-01-23 18:49:54 +00:00
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <iostream>
2008-05-20 19:11:56 +00:00
#include "os/FileStore.h"
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "common/debug.h"
2008-01-23 18:49:54 +00:00
2008-11-05 22:53:53 +00:00
#undef dout_prefix
#define dout_prefix *_dout
2008-11-05 22:53:53 +00:00
2008-01-25 05:59:38 +00:00
struct io {
utime_t start, ack, commit;
bool done() {
return ack.sec() && commit.sec();
}
};
map<off_t,io> writes;
Cond cond;
2008-10-31 22:01:09 +00:00
Mutex lock("streamtest.cc lock");
unsigned concurrent = 1;
void throttle()
{
Mutex::Locker l(lock);
while (writes.size() >= concurrent) {
//generic_dout(0) << "waiting" << dendl;
cond.Wait(lock);
}
}
2008-01-25 05:59:38 +00:00
void pr(off_t off)
{
io &i = writes[off];
generic_dout(0) << off << "\t"
2008-01-25 05:59:38 +00:00
<< (i.ack - i.start) << "\t"
<< (i.commit - i.start) << dendl;
writes.erase(off);
cond.Signal();
2008-01-25 05:59:38 +00:00
}
void set_start(off_t off, utime_t t)
{
Mutex::Locker l(lock);
writes[off].start = t;
}
void set_ack(off_t off, utime_t t)
{
Mutex::Locker l(lock);
//generic_dout(0) << "ack " << off << dendl;
2008-01-25 05:59:38 +00:00
writes[off].ack = t;
if (writes[off].done())
pr(off);
}
void set_commit(off_t off, utime_t t)
{
Mutex::Locker l(lock);
//generic_dout(0) << "commit " << off << dendl;
2008-01-25 05:59:38 +00:00
writes[off].commit = t;
if (writes[off].done())
pr(off);
}
struct C_Ack : public Context {
off_t off;
C_Ack(off_t o) : off(o) {}
void finish(int r) {
set_ack(off, ceph_clock_now(g_ceph_context));
}
};
2008-01-23 18:49:54 +00:00
struct C_Commit : public Context {
off_t off;
C_Commit(off_t o) : off(o) {}
void finish(int r) {
set_commit(off, ceph_clock_now(g_ceph_context));
2008-01-23 18:49:54 +00:00
}
};
int main(int argc, const char **argv)
{
vector<const char*> args;
argv_to_vec(argc, argv, args);
2008-11-05 22:53:53 +00:00
env_to_vec(args);
global_init(args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
2008-01-23 18:49:54 +00:00
// args
if (args.size() < 3) return -1;
2008-01-23 18:49:54 +00:00
const char *filename = args[0];
int seconds = atoi(args[1]);
int bytes = atoi(args[2]);
const char *journal = 0;
if (args.size() >= 4)
journal = args[3];
if (args.size() >= 5)
concurrent = atoi(args[4]);
cout << "concurrent = " << concurrent << std::endl;
2008-01-23 18:49:54 +00:00
buffer::ptr bp(bytes);
bp.zero();
bufferlist bl;
bl.push_back(bp);
//float interval = 1.0 / 1000;
2008-01-23 18:49:54 +00:00
cout << "#dev " << filename
<< ", " << seconds << " seconds, " << bytes << " bytes per write" << std::endl;
2008-01-23 18:49:54 +00:00
ObjectStore *fs = new FileStore(filename, journal);
if (fs->mount() < 0) {
2008-01-23 18:49:54 +00:00
cout << "mount failed" << std::endl;
return -1;
}
ObjectStore::Transaction ft;
ft.create_collection(coll_t());
fs->apply_transaction(ft);
utime_t now = ceph_clock_now(g_ceph_context);
2008-01-23 18:49:54 +00:00
utime_t end = now;
end += seconds;
off_t pos = 0;
//cout << "stop at " << end << std::endl;
cout << "# offset\tack\tcommit" << std::endl;
while (now < end) {
sobject_t poid(object_t("streamtest"), 0);
set_start(pos, ceph_clock_now(g_ceph_context));
ObjectStore::Transaction *t = new ObjectStore::Transaction;
t->write(coll_t(), poid, pos, bytes, bl);
fs->queue_transaction(NULL, t, new C_Ack(pos), new C_Commit(pos));
2008-01-23 18:49:54 +00:00
pos += bytes;
throttle();
// wait?
/*
utime_t next = start;
next += interval;
if (now < next) {
float s = next - now;
s *= 1000 * 1000; // s -> us
//cout << "sleeping for " << s << " us" << std::endl;
usleep((int)s);
}
*/
2008-01-23 18:49:54 +00:00
}
fs->umount();
2008-01-23 18:49:54 +00:00
}