osdc/Objecter: add pmore argument to omap_get_{keys,vals}

Note that the MDS callers have new #warnings indicating that they
are not providing the pmore argument and are thus broken.  (They
were already broken.)

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2017-01-16 13:48:06 -05:00
parent d6dcf417c2
commit b6aef08cc9
4 changed files with 62 additions and 23 deletions

View File

@ -266,7 +266,8 @@ void librados::ObjectReadOperation::omap_get_vals(
int *prval)
{
::ObjectOperation *o = &impl->o;
o->omap_get_vals(start_after, filter_prefix, max_return, out_vals, prval);
o->omap_get_vals(start_after, filter_prefix, max_return, out_vals, nullptr,
prval);
}
void librados::ObjectReadOperation::omap_get_vals(
@ -276,7 +277,7 @@ void librados::ObjectReadOperation::omap_get_vals(
int *prval)
{
::ObjectOperation *o = &impl->o;
o->omap_get_vals(start_after, "", max_return, out_vals, prval);
o->omap_get_vals(start_after, "", max_return, out_vals, nullptr, prval);
}
void librados::ObjectReadOperation::omap_get_keys(
@ -286,7 +287,7 @@ void librados::ObjectReadOperation::omap_get_keys(
int *prval)
{
::ObjectOperation *o = &impl->o;
o->omap_get_keys(start_after, max_return, out_keys, prval);
o->omap_get_keys(start_after, max_return, out_keys, nullptr, prval);
}
void librados::ObjectReadOperation::omap_get_header(bufferlist *bl, int *prval)
@ -5714,11 +5715,13 @@ extern "C" void rados_read_op_omap_get_vals(rados_read_op_t read_op,
RadosOmapIter *omap_iter = new RadosOmapIter;
const char *start = start_after ? start_after : "";
const char *filter = filter_prefix ? filter_prefix : "";
((::ObjectOperation *)read_op)->omap_get_vals(start,
filter,
max_return,
&omap_iter->values,
prval);
((::ObjectOperation *)read_op)->omap_get_vals(
start,
filter,
max_return,
&omap_iter->values,
nullptr,
prval);
((::ObjectOperation *)read_op)->add_handler(new C_OmapIter(omap_iter));
*iter = omap_iter;
tracepoint(librados, rados_read_op_omap_get_vals_exit, *iter);
@ -5747,8 +5750,9 @@ extern "C" void rados_read_op_omap_get_keys(rados_read_op_t read_op,
tracepoint(librados, rados_read_op_omap_get_keys_enter, read_op, start_after, max_return, prval);
RadosOmapIter *omap_iter = new RadosOmapIter;
C_OmapKeysIter *ctx = new C_OmapKeysIter(omap_iter);
((::ObjectOperation *)read_op)->omap_get_keys(start_after ? start_after : "",
max_return, &ctx->keys, prval);
((::ObjectOperation *)read_op)->omap_get_keys(
start_after ? start_after : "",
max_return, &ctx->keys, nullptr, prval);
((::ObjectOperation *)read_op)->add_handler(ctx);
*iter = omap_iter;
tracepoint(librados, rados_read_op_omap_get_keys_exit, *iter);

View File

@ -1515,7 +1515,8 @@ void CDir::_omap_fetch(MDSInternalContextBase *c, const std::set<dentry_key_t>&
rd.omap_get_header(&fin->hdrbl, &fin->ret1);
if (keys.empty()) {
assert(!c);
rd.omap_get_vals("", "", (uint64_t)-1, &fin->omap, &fin->ret2);
#warning use the pmore arg
rd.omap_get_vals("", "", (uint64_t)-1, &fin->omap, nullptr, &fin->ret2);
} else {
assert(c);
std::set<std::string> str_keys;

View File

@ -219,8 +219,9 @@ void SessionMap::_load_finish(
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
ObjectOperation op;
#warning fixme use the pmore arg
op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op,
&c->session_vals, &c->values_r);
&c->session_vals, nullptr, &c->values_r);
mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
new C_OnFinisher(c, mds->finisher));
} else {
@ -262,8 +263,9 @@ void SessionMap::load(MDSInternalContextBase *onload)
ObjectOperation op;
op.omap_get_header(&c->header_bl, &c->header_r);
#warning use the pmore arg
op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op,
&c->session_vals, &c->values_r);
&c->session_vals, nullptr, &c->values_r);
mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher));
}

View File

@ -393,15 +393,30 @@ struct ObjectOperation {
struct C_ObjectOperation_decodevals : public Context {
bufferlist bl;
std::map<std::string,bufferlist> *pattrs;
bool *ptruncated;
int *prval;
C_ObjectOperation_decodevals(std::map<std::string,bufferlist> *pa, int *pr)
: pattrs(pa), prval(pr) {}
C_ObjectOperation_decodevals(std::map<std::string,bufferlist> *pa,
bool *pt, int *pr)
: pattrs(pa), ptruncated(pt), prval(pr) {
if (ptruncated) {
*ptruncated = false;
}
}
void finish(int r) {
if (r >= 0) {
bufferlist::iterator p = bl.begin();
try {
if (pattrs)
::decode(*pattrs, p);
if (ptruncated) {
if (!pattrs) {
std::map<std::string,bufferlist> ignore;
::decode(ignore, p);
}
if (!p.end()) {
::decode(*ptruncated, p);
}
}
}
catch (buffer::error& e) {
if (prval)
@ -413,15 +428,30 @@ struct ObjectOperation {
struct C_ObjectOperation_decodekeys : public Context {
bufferlist bl;
std::set<std::string> *pattrs;
bool *ptruncated;
int *prval;
C_ObjectOperation_decodekeys(std::set<std::string> *pa, int *pr)
: pattrs(pa), prval(pr) {}
C_ObjectOperation_decodekeys(std::set<std::string> *pa, bool *pt,
int *pr)
: pattrs(pa), ptruncated(pt), prval(pr) {
if (ptruncated) {
*ptruncated = false;
}
}
void finish(int r) {
if (r >= 0) {
bufferlist::iterator p = bl.begin();
try {
if (pattrs)
::decode(*pattrs, p);
if (ptruncated) {
if (!pattrs) {
std::set<std::string> ignore;
::decode(ignore, p);
}
if (!p.end()) {
::decode(*ptruncated, p);
}
}
}
catch (buffer::error& e) {
if (prval)
@ -505,7 +535,7 @@ struct ObjectOperation {
if (pattrs || prval) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodevals *h
= new C_ObjectOperation_decodevals(pattrs, prval);
= new C_ObjectOperation_decodevals(pattrs, nullptr, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
@ -564,6 +594,7 @@ struct ObjectOperation {
void omap_get_keys(const string &start_after,
uint64_t max_to_get,
std::set<std::string> *out_set,
bool *ptruncated,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
bufferlist bl;
@ -572,10 +603,10 @@ struct ObjectOperation {
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
if (prval || out_set) {
if (prval || ptruncated || out_set) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodekeys *h =
new C_ObjectOperation_decodekeys(out_set, prval);
new C_ObjectOperation_decodekeys(out_set, ptruncated, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
@ -586,6 +617,7 @@ struct ObjectOperation {
const string &filter_prefix,
uint64_t max_to_get,
std::map<std::string, bufferlist> *out_set,
bool *ptruncated,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
bufferlist bl;
@ -595,10 +627,10 @@ struct ObjectOperation {
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
if (prval || out_set) {
if (prval || out_set || ptruncated) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodevals *h =
new C_ObjectOperation_decodevals(out_set, prval);
new C_ObjectOperation_decodevals(out_set, ptruncated, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
@ -617,7 +649,7 @@ struct ObjectOperation {
if (prval || out_set) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodevals *h =
new C_ObjectOperation_decodevals(out_set, prval);
new C_ObjectOperation_decodevals(out_set, nullptr, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;