mirror of
https://github.com/ceph/ceph
synced 2025-02-23 02:57:21 +00:00
Merge pull request #746 from ceph/wip-6582
Wip 6582 Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
commit
ddfe67fe2a
@ -2507,6 +2507,11 @@ void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m)
|
||||
m.clear();
|
||||
}
|
||||
|
||||
void PG::requeue_op(OpRequestRef op)
|
||||
{
|
||||
osd->op_wq.queue_front(make_pair(PGRef(this), op));
|
||||
}
|
||||
|
||||
void PG::requeue_ops(list<OpRequestRef> &ls)
|
||||
{
|
||||
dout(15) << " requeue_ops " << ls << dendl;
|
||||
|
@ -535,6 +535,7 @@ protected:
|
||||
void split_ops(PG *child, unsigned split_bits);
|
||||
|
||||
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
|
||||
void requeue_op(OpRequestRef op);
|
||||
void requeue_ops(list<OpRequestRef> &l);
|
||||
|
||||
// stats that persist lazily
|
||||
|
@ -4336,7 +4336,7 @@ int ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
|
||||
// FIXME: if the src etc match, we could avoid restarting from the
|
||||
// beginning.
|
||||
CopyOpRef cop = copy_ops[dest];
|
||||
cancel_copy(cop);
|
||||
cancel_copy(cop, false);
|
||||
}
|
||||
|
||||
CopyOpRef cop(new CopyOp(cb, obc, src, oloc, version, temp_dest_oid));
|
||||
@ -4429,6 +4429,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
|
||||
|
||||
dout(20) << __func__ << " complete; committing" << dendl;
|
||||
results.get<0>() = r;
|
||||
results.get<4>() = false;
|
||||
cop->cb->complete(results);
|
||||
|
||||
copy_ops.erase(obc->obs.oi.soid);
|
||||
@ -4514,7 +4515,7 @@ int ReplicatedPG::finish_copyfrom(OpContext *ctx)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void ReplicatedPG::cancel_copy(CopyOpRef cop)
|
||||
void ReplicatedPG::cancel_copy(CopyOpRef cop, bool requeue)
|
||||
{
|
||||
dout(10) << __func__ << " " << cop->obc->obs.oi.soid
|
||||
<< " from " << cop->src << " " << cop->oloc << " v" << cop->version
|
||||
@ -4531,16 +4532,18 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop)
|
||||
|
||||
kick_object_context_blocked(cop->obc);
|
||||
bool temp_obj_created = !cop->cursor.is_initial();
|
||||
CopyResults result(-ECANCELED, 0, temp_obj_created, ObjectStore::Transaction());
|
||||
CopyResults result(-ECANCELED, 0, temp_obj_created,
|
||||
ObjectStore::Transaction(), requeue);
|
||||
cop->cb->complete(result);
|
||||
}
|
||||
|
||||
void ReplicatedPG::cancel_copy_ops()
|
||||
void ReplicatedPG::cancel_copy_ops(bool requeue)
|
||||
{
|
||||
dout(10) << __func__ << dendl;
|
||||
map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
|
||||
while (p != copy_ops.end()) {
|
||||
cancel_copy((p++)->second);
|
||||
// requeue this op? can I queue up all of them?
|
||||
cancel_copy((p++)->second, requeue);
|
||||
}
|
||||
}
|
||||
|
||||
@ -7292,7 +7295,7 @@ void ReplicatedPG::on_shutdown()
|
||||
deleting = true;
|
||||
|
||||
unreg_next_scrub();
|
||||
cancel_copy_ops();
|
||||
cancel_copy_ops(false);
|
||||
apply_and_flush_repops(false);
|
||||
context_registry_on_change();
|
||||
|
||||
@ -7329,7 +7332,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
|
||||
|
||||
context_registry_on_change();
|
||||
|
||||
cancel_copy_ops();
|
||||
cancel_copy_ops(is_primary());
|
||||
|
||||
// requeue object waiters
|
||||
if (is_primary()) {
|
||||
|
@ -140,8 +140,8 @@ public:
|
||||
* transactions would not allow); if you are doing the copy for a read
|
||||
* op you will have to generate a separate op to finish the copy with.
|
||||
*/
|
||||
/// return code, total object size, data in temp object?, final Transaction
|
||||
typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction> CopyResults;
|
||||
/// return code, total object size, data in temp object?, final Transaction, should requeue Op
|
||||
typedef boost::tuple<int, size_t, bool, ObjectStore::Transaction, bool> CopyResults;
|
||||
class CopyCallback : public GenContext<CopyResults&> {
|
||||
protected:
|
||||
CopyCallback() {}
|
||||
@ -155,6 +155,9 @@ public:
|
||||
* results.get<3>() is a Transaction; if non-empty you need to perform
|
||||
* its results before any other accesses to the object in order to
|
||||
* complete the copy.
|
||||
* results.get<4>() is a bool; if true you must requeue the client Op
|
||||
* after processing the rest of the results (this will only be true
|
||||
* in conjunction with an ECANCELED return code).
|
||||
*/
|
||||
virtual void finish(CopyResults& results_) = 0;
|
||||
|
||||
@ -182,6 +185,8 @@ public:
|
||||
if (r < 0) {
|
||||
if (r != -ECANCELED) { // on cancel just toss it out; client resends
|
||||
ctx->pg->osd->reply_op_error(ctx->op, r);
|
||||
} else if (results_.get<4>()) {
|
||||
ctx->pg->requeue_op(ctx->op);
|
||||
}
|
||||
ctx->pg->close_op_ctx(ctx);
|
||||
}
|
||||
@ -978,8 +983,8 @@ protected:
|
||||
void _build_finish_copy_transaction(CopyOpRef cop,
|
||||
ObjectStore::Transaction& t);
|
||||
int finish_copyfrom(OpContext *ctx);
|
||||
void cancel_copy(CopyOpRef cop);
|
||||
void cancel_copy_ops();
|
||||
void cancel_copy(CopyOpRef cop, bool requeue);
|
||||
void cancel_copy_ops(bool requeue);
|
||||
|
||||
friend class C_Copyfrom;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user