ReplicatedPG: implement the RWTracker mechanisms for backfill read locking

We want backfill to take read locks on the objects it's pushing. Add
a get_backfill_read(hobject_t) function, a corresponding drop_backfill_read(),
and a backfill_waiting_on_read member in ObjState. Check that member when
getting a write lock, and in put_write(). Tell callers to requeue the recovery
if necessary, and clean up the backfill block when its read lock is dropped.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2013-10-21 14:27:32 -07:00 committed by Samuel Just
parent 96ed5b8c38
commit 87daef76cd
2 changed files with 36 additions and 5 deletions

View File

@ -790,6 +790,7 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid),
pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
rw_manager(),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
temp_seq(0),
snap_trimmer_machine(this)

View File

@ -500,7 +500,10 @@ protected:
uint64_t count; /// number of readers or writers
list<OpRequestRef> waiters; /// ops waiting on state change
ObjState() : state(NONE), count(0) {}
/// if set, restart backfill when we can get a read lock
bool backfill_read_marker;
ObjState() : state(NONE), count(0), backfill_read_marker(false) {}
bool get_read(OpRequestRef op) {
if (get_read_lock()) {
return true;
@ -539,7 +542,8 @@ protected:
}
bool get_write_lock() {
// don't starve anybody!
if (!waiters.empty()) {
if (!waiters.empty() ||
backfill_read_marker) {
return false;
}
switch (state) {
@ -577,8 +581,10 @@ protected:
}
bool empty() const { return state == NONE; }
};
map<hobject_t, ObjState > obj_state;
map<hobject_t, ObjState > obj_state; ///< map of rw_lock states
public:
RWTracker() {}
bool get_read(const hobject_t &hoid, OpRequestRef op) {
return obj_state[hoid].get_read(op);
}
@ -591,12 +597,33 @@ protected:
obj_state.erase(hoid);
}
}
void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake,
bool *requeue_recovery) {
obj_state[hoid].put_write(to_wake);
if (obj_state[hoid].empty()) {
if (obj_state[hoid].backfill_read_marker)
*requeue_recovery = true;
obj_state.erase(hoid);
}
}
bool get_backfill_read(const hobject_t &hoid) {
ObjState& obj_locker = obj_state[hoid];
obj_locker.backfill_read_marker = true;
if (obj_locker.get_read_lock()) {
return true;
} // else
return false;
}
void drop_backfill_read(const hobject_t &hoid, list<OpRequestRef> *ls) {
map<hobject_t, ObjState>::iterator i = obj_state.find(hoid);
ObjState& obj_locker = i->second;
assert(obj_locker.backfill_read_marker = true);
obj_locker.put_read(ls);
if (obj_locker.empty())
obj_state.erase(i);
else
obj_locker.backfill_read_marker = false;
}
} rw_manager;
/**
@ -641,9 +668,12 @@ protected:
*/
void release_op_ctx_locks(OpContext *ctx) {
list<OpRequestRef> to_req;
bool requeue_recovery = false;
switch (ctx->lock_to_release) {
case OpContext::W_LOCK:
rw_manager.put_write(ctx->obs->oi.soid, &to_req);
rw_manager.put_write(ctx->obs->oi.soid, &to_req, &requeue_recovery);
if (requeue_recovery)
osd->recovery_wq.queue(this);
break;
case OpContext::R_LOCK:
rw_manager.put_read(ctx->obs->oi.soid, &to_req);