osd/PrimaryLogPG: tell client if we truncate results

If we truncate the results of the omap read commands,
provide a flag so that the caller knows there is more
to be read.  We don't need to provide the name of the
next key because the interface is defined as
"start_after" (not "start_with"), allowing them to use
the last key they received.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2017-01-14 23:23:11 -05:00
parent 5db320ce93
commit d6dcf417c2

View File

@ -5747,22 +5747,25 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
bufferlist bl;
uint32_t num = 0;
bool truncated = false;
if (oi.is_omap()) {
ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(
coll, ghobject_t(soid)
);
assert(iter);
iter->upper_bound(start_after);
for (num = 0;
num < max_return &&
bl.length() < cct->_conf->osd_max_omap_bytes_per_request &&
iter->valid();
++num, iter->next(false)) {
for (num = 0; iter->valid(); ++num, iter->next(false)) {
if (num >= max_return ||
bl.length() >= cct->_conf->osd_max_omap_bytes_per_request) {
truncated = true;
break;
}
::encode(iter->key(), bl);
}
} // else return empty out_set
::encode(num, osd_op.outdata);
osd_op.outdata.claim_append(bl);
::encode(truncated, osd_op.outdata);
ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
ctx->delta_stats.num_rd++;
}
@ -5790,6 +5793,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
tracepoint(osd, do_osd_op_pre_omapgetvals, soid.oid.name.c_str(), soid.snap.val, start_after.c_str(), max_return, filter_prefix.c_str());
uint32_t num = 0;
bool truncated = false;
bufferlist bl;
if (oi.is_omap()) {
ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(
@ -5802,18 +5806,22 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
iter->upper_bound(start_after);
if (filter_prefix > start_after) iter->lower_bound(filter_prefix);
for (num = 0;
num < max_return &&
bl.length() < cct->_conf->osd_max_omap_bytes_per_request &&
iter->valid() &&
iter->valid() &&
iter->key().substr(0, filter_prefix.size()) == filter_prefix;
++num, iter->next(false)) {
dout(20) << "Found key " << iter->key() << dendl;
if (num >= max_return ||
bl.length() >= cct->_conf->osd_max_omap_bytes_per_request) {
truncated = true;
break;
}
::encode(iter->key(), bl);
::encode(iter->value(), bl);
}
} // else return empty out_set
::encode(num, osd_op.outdata);
osd_op.outdata.claim_append(bl);
::encode(truncated, osd_op.outdata);
ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(osd_op.outdata.length(), 10);
ctx->delta_stats.num_rd++;
}