Merge branch 'wip-1796'

Reviewed-by: Greg Farnum <gregory.farnum@dreamhost.com>
This commit is contained in:
Sage Weil 2012-03-06 11:03:01 -08:00
commit b52d408758
4 changed files with 90 additions and 14 deletions

View File

@ -84,6 +84,18 @@ void MDLog::init_journaler()
logger, l_mdl_jlat,
&mds->timer);
assert(journaler->is_readonly());
journaler->set_write_error_handler(new C_MDL_WriteError(this));
}
void MDLog::handle_journaler_write_error(int r)
{
if (r == -EBLACKLISTED) {
derr << "we have been blacklisted (fenced), respawning..." << dendl;
mds->respawn();
} else {
derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
mds->suicide();
}
}
void MDLog::write_head(Context *c)

View File

@ -127,7 +127,16 @@ public:
private:
void init_journaler();
struct C_MDL_WriteError : public Context {
MDLog *mdlog;
C_MDL_WriteError(MDLog *m) : mdlog(m) {}
void finish(int r) {
mdlog->handle_journaler_write_error(r);
}
};
void handle_journaler_write_error(int r);
public:
void create_logger();

View File

@ -358,14 +358,16 @@ void Journaler::write_head(Context *oncommit)
void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
{
if (r < 0)
if (r < 0) {
lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
assert(r >= 0); // we can't really recover from write errors here
handle_write_error(r);
return;
}
assert(!readonly);
ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
last_committed = wrote;
if (oncommit) {
oncommit->finish(0);
oncommit->finish(r);
delete oncommit;
}
@ -389,9 +391,11 @@ public:
void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
{
assert(!readonly);
if (r < 0)
if (r < 0) {
lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
assert(r >= 0);
handle_write_error(r);
return;
}
assert(start >= safe_pos);
assert(start < flush_pos);
@ -412,10 +416,11 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
safe_pos = *pending_safe.begin();
ldout(cct, 10) << "_finish_flush safe from " << start
<< ", pending_safe " << pending_safe
<< ", (prezeroing/prezero)/write/flush/safe positions now "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
<< dendl;
<< ", pending_safe " << pending_safe
<< ", (prezeroing/prezero)/write/flush/safe positions now "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos
<< "/" << flush_pos << "/" << safe_pos
<< dendl;
// kick waiters <= safe_pos
while (!waitfor_safe.empty()) {
@ -664,8 +669,12 @@ void Journaler::_prezeroed(int r, uint64_t start, uint64_t len)
<< ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos
<< ", pending " << pending_zero
<< dendl;
if (r < 0 && r != -ENOENT)
if (r < 0 && r != -ENOENT) {
lderr(cct) << "_prezeroed got " << cpp_strerror(r) << dendl;
handle_write_error(r);
return;
}
assert(r == 0 || r == -ENOENT);
if (start == prezero_pos) {
@ -722,7 +731,7 @@ void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
if (on_readable) {
Context *f = on_readable;
on_readable = 0;
f->finish(0);
f->finish(r);
delete f;
}
return;
@ -1020,8 +1029,12 @@ void Journaler::_trim_finish(int r, uint64_t to)
<< ", trimmed/trimming/expire now "
<< to << "/" << trimming_pos << "/" << expire_pos
<< dendl;
if (r < 0 && r != -ENOENT)
if (r < 0 && r != -ENOENT) {
lderr(cct) << "_trim_finish got " << cpp_strerror(r) << dendl;
handle_write_error(r);
return;
}
assert(r >= 0 || r == -ENOENT);
assert(to <= trimming_pos);
@ -1036,5 +1049,17 @@ void Journaler::_trim_finish(int r, uint64_t to)
}
}
void Journaler::handle_write_error(int r)
{
lderr(cct) << "handle_write_error " << cpp_strerror(r) << dendl;
if (on_write_error) {
on_write_error->finish(r);
delete on_write_error;
on_write_error = NULL;
} else {
assert(0 == "unhandled write error");
}
}
// eof.

View File

@ -197,6 +197,8 @@ private:
// for wait_for_readable()
Context *on_readable;
Context *on_write_error;
void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
void _assimilate_prefetch();
void _issue_read(uint64_t len); // read some more
@ -228,6 +230,15 @@ private:
last_written = last_committed = h;
}
/**
* handle a write error
*
* called when we get an objecter error on a write.
*
* @param r error code
*/
void handle_write_error(int r);
public:
Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) :
cct(obj->cct), last_written(mag), last_committed(mag),
@ -239,7 +250,7 @@ public:
waiting_for_zero(false),
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(0), temp_fetch_len(0), prefetch_from(0),
on_readable(0),
on_readable(0), on_write_error(NULL),
expire_pos(0), trimming_pos(0), trimmed_pos(0)
{
}
@ -322,6 +333,25 @@ public:
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
}
/**
* set write error callback
*
* Set a callback/context to trigger if we get a write error from
* the objecter. This may be from an explicit request (e.g., flush)
* or something async the journaler did on its own (e.g., journal
* header update).
*
* It is only used once; if the caller continues to use the
* Journaler and wants to hear about errors, it needs to reset the
* error_handler.
*
* @param c callback/context to trigger on error
*/
void set_write_error_handler(Context *c) {
assert(!on_write_error);
on_write_error = c;
}
// trim
void set_expire_pos(int64_t ep) { expire_pos = ep; }
void set_trimmed_pos(int64_t p) { trimming_pos = trimmed_pos = p; }