mirror of
https://github.com/ceph/ceph
synced 2025-04-01 14:51:13 +00:00
Merge pull request #32381 from athanatos/sjust/wip-read-from-replica-py2
osd: propagate mlcod to replicas and fix problems with read from replica Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
commit
bdc3ed252e
@ -0,0 +1,21 @@
|
||||
tasks:
|
||||
- rados:
|
||||
clients: [client.0]
|
||||
ops: 400000
|
||||
max_seconds: 600
|
||||
max_in_flight: 64
|
||||
objects: 1024
|
||||
size: 16384
|
||||
ec_pool: true
|
||||
balanced_reads: true
|
||||
op_weights:
|
||||
read: 100
|
||||
write: 0
|
||||
append: 100
|
||||
delete: 50
|
||||
snap_create: 50
|
||||
snap_remove: 50
|
||||
rollback: 50
|
||||
copy_from: 50
|
||||
setattr: 25
|
||||
rmattr: 25
|
40
qa/suites/rados/thrash/workloads/cache-snaps-balanced.yaml
Normal file
40
qa/suites/rados/thrash/workloads/cache-snaps-balanced.yaml
Normal file
@ -0,0 +1,40 @@
|
||||
overrides:
|
||||
ceph:
|
||||
log-whitelist:
|
||||
- must scrub before tier agent can activate
|
||||
conf:
|
||||
osd:
|
||||
# override short_pg_log_entries.yaml (which sets these under [global])
|
||||
osd_min_pg_log_entries: 3000
|
||||
osd_max_pg_log_entries: 3000
|
||||
tasks:
|
||||
- exec:
|
||||
client.0:
|
||||
- sudo ceph osd pool create base 4
|
||||
- sudo ceph osd pool application enable base rados
|
||||
- sudo ceph osd pool create cache 4
|
||||
- sudo ceph osd tier add base cache
|
||||
- sudo ceph osd tier cache-mode cache writeback
|
||||
- sudo ceph osd tier set-overlay base cache
|
||||
- sudo ceph osd pool set cache hit_set_type bloom
|
||||
- sudo ceph osd pool set cache hit_set_count 8
|
||||
- sudo ceph osd pool set cache hit_set_period 3600
|
||||
- sudo ceph osd pool set cache target_max_objects 250
|
||||
- sudo ceph osd pool set cache min_read_recency_for_promote 2
|
||||
- rados:
|
||||
clients: [client.0]
|
||||
pools: [base]
|
||||
ops: 4000
|
||||
objects: 500
|
||||
balance_reads: true
|
||||
op_weights:
|
||||
read: 100
|
||||
write: 100
|
||||
delete: 50
|
||||
copy_from: 50
|
||||
cache_flush: 50
|
||||
cache_try_flush: 50
|
||||
cache_evict: 50
|
||||
snap_create: 50
|
||||
snap_remove: 50
|
||||
rollback: 50
|
25
qa/suites/rados/thrash/workloads/small-objects-balanced.yaml
Normal file
25
qa/suites/rados/thrash/workloads/small-objects-balanced.yaml
Normal file
@ -0,0 +1,25 @@
|
||||
overrides:
|
||||
ceph:
|
||||
crush_tunables: jewel
|
||||
conf:
|
||||
mon:
|
||||
mon osd initial require min compat client: jewel
|
||||
tasks:
|
||||
- rados:
|
||||
clients: [client.0]
|
||||
ops: 400000
|
||||
max_seconds: 600
|
||||
max_in_flight: 64
|
||||
objects: 1024
|
||||
size: 16384
|
||||
balance_reads: true
|
||||
op_weights:
|
||||
read: 100
|
||||
write: 100
|
||||
delete: 50
|
||||
snap_create: 50
|
||||
snap_remove: 50
|
||||
rollback: 50
|
||||
copy_from: 50
|
||||
setattr: 25
|
||||
rmattr: 25
|
@ -0,0 +1,25 @@
|
||||
overrides:
|
||||
ceph:
|
||||
crush_tunables: jewel
|
||||
conf:
|
||||
mon:
|
||||
mon osd initial require min compat client: jewel
|
||||
tasks:
|
||||
- rados:
|
||||
clients: [client.0]
|
||||
ops: 400000
|
||||
max_seconds: 600
|
||||
max_in_flight: 64
|
||||
objects: 1024
|
||||
size: 16384
|
||||
localize_reads: true
|
||||
op_weights:
|
||||
read: 100
|
||||
write: 100
|
||||
delete: 50
|
||||
snap_create: 50
|
||||
snap_remove: 50
|
||||
rollback: 50
|
||||
copy_from: 50
|
||||
setattr: 25
|
||||
rmattr: 25
|
@ -0,0 +1,14 @@
|
||||
tasks:
|
||||
- rados:
|
||||
clients: [client.0]
|
||||
ops: 4000
|
||||
objects: 50
|
||||
balance_reads: true
|
||||
op_weights:
|
||||
read: 100
|
||||
write: 100
|
||||
delete: 50
|
||||
snap_create: 50
|
||||
snap_remove: 50
|
||||
rollback: 50
|
||||
copy_from: 50
|
@ -0,0 +1,14 @@
|
||||
tasks:
|
||||
- rados:
|
||||
clients: [client.0]
|
||||
ops: 4000
|
||||
objects: 50
|
||||
localize_reads: true
|
||||
op_weights:
|
||||
read: 100
|
||||
write: 100
|
||||
delete: 50
|
||||
snap_create: 50
|
||||
snap_remove: 50
|
||||
rollback: 50
|
||||
copy_from: 50
|
@ -153,6 +153,10 @@ def task(ctx, config):
|
||||
args.extend(['--low_tier_pool', config.get('low_tier_pool', None)])
|
||||
if config.get('pool_snaps', False):
|
||||
args.extend(['--pool-snaps'])
|
||||
if config.get('balance_reads', False):
|
||||
args.extend(['--balance-reads'])
|
||||
if config.get('localize_reads', False):
|
||||
args.extend(['--localize-reads'])
|
||||
args.extend([
|
||||
'--max-ops', str(config.get('ops', 10000)),
|
||||
'--objects', str(config.get('objects', 500)),
|
||||
|
@ -172,6 +172,23 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
/* Clears weakrefs in the interval [from, to] -- note that to is inclusive */
|
||||
void clear_range(
|
||||
const K& from,
|
||||
const K& to) {
|
||||
list<VPtr> vals; // release any refs we have after we drop the lock
|
||||
{
|
||||
std::lock_guard l{lock};
|
||||
auto from_iter = weak_refs.lower_bound(from);
|
||||
auto to_iter = weak_refs.upper_bound(to);
|
||||
for (auto i = from_iter; i != to_iter; ) {
|
||||
vals.push_back(i->second.first.lock());
|
||||
lru_remove((i++)->first);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void purge(const K &key) {
|
||||
VPtr val; // release any ref we have after we drop the lock
|
||||
{
|
||||
|
@ -103,6 +103,7 @@ DEFINE_CEPH_FEATURE(15, 1, MONENC)
|
||||
DEFINE_CEPH_FEATURE_RETIRED(16, 1, QUERY_T, JEWEL, LUMINOUS)
|
||||
|
||||
DEFINE_CEPH_FEATURE(16, 3, SERVER_OCTOPUS)
|
||||
DEFINE_CEPH_FEATURE(16, 3, OSD_REPOP_MLCOD)
|
||||
DEFINE_CEPH_FEATURE_RETIRED(17, 1, INDEP_PG_MAP, JEWEL, LUMINOUS)
|
||||
|
||||
DEFINE_CEPH_FEATURE(17, 3, OS_PERF_STAT_NS)
|
||||
@ -244,6 +245,7 @@ DEFINE_CEPH_FEATURE_DEPRECATED(63, 1, RESERVED_BROKEN, LUMINOUS) // client-facin
|
||||
CEPH_FEATURE_CEPHX_V2 | \
|
||||
CEPH_FEATURE_OSD_PGLOG_HARDLIMIT | \
|
||||
CEPH_FEATUREMASK_SERVER_OCTOPUS | \
|
||||
CEPH_FEATUREMASK_OSD_REPOP_MLCOD | \
|
||||
0ULL)
|
||||
|
||||
#define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL
|
||||
|
@ -24,7 +24,7 @@
|
||||
|
||||
class MOSDRepOp : public MOSDFastDispatchOp {
|
||||
private:
|
||||
static constexpr int HEAD_VERSION = 2;
|
||||
static constexpr int HEAD_VERSION = 3;
|
||||
static constexpr int COMPAT_VERSION = 1;
|
||||
|
||||
public:
|
||||
@ -54,8 +54,7 @@ public:
|
||||
|
||||
// piggybacked osd/og state
|
||||
eversion_t pg_trim_to; // primary->replica: trim to here
|
||||
eversion_t pg_roll_forward_to; // primary->replica: trim rollback
|
||||
// info to here
|
||||
eversion_t min_last_complete_ondisk; // lower bound on committed version
|
||||
|
||||
hobject_t new_temp_oid; ///< new temp object that we must now start tracking
|
||||
hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking
|
||||
@ -110,7 +109,15 @@ public:
|
||||
|
||||
decode(from, p);
|
||||
decode(updated_hit_set_history, p);
|
||||
decode(pg_roll_forward_to, p);
|
||||
|
||||
if (header.version >= 3) {
|
||||
decode(min_last_complete_ondisk, p);
|
||||
} else {
|
||||
/* This field used to mean pg_roll_foward_to, but ReplicatedBackend
|
||||
* simply assumes that we're rolling foward to version. */
|
||||
eversion_t pg_roll_forward_to;
|
||||
decode(pg_roll_forward_to, p);
|
||||
}
|
||||
final_decode_needed = false;
|
||||
}
|
||||
|
||||
@ -137,7 +144,7 @@ public:
|
||||
encode(discard_temp_oid, payload);
|
||||
encode(from, payload);
|
||||
encode(updated_hit_set_history, payload);
|
||||
encode(pg_roll_forward_to, payload);
|
||||
encode(min_last_complete_ondisk, payload);
|
||||
}
|
||||
|
||||
MOSDRepOp()
|
||||
@ -159,6 +166,11 @@ public:
|
||||
version(v) {
|
||||
set_tid(rtid);
|
||||
}
|
||||
|
||||
void set_rollback_to(const eversion_t &rollback_to) {
|
||||
header.version = 2;
|
||||
min_last_complete_ondisk = rollback_to;
|
||||
}
|
||||
private:
|
||||
~MOSDRepOp() override {}
|
||||
|
||||
@ -171,6 +183,11 @@ public:
|
||||
out << " " << poid << " v " << version;
|
||||
if (updated_hit_set_history)
|
||||
out << ", has_updated_hit_set_history";
|
||||
if (header.version < 3) {
|
||||
out << ", rollback_to(legacy)=" << min_last_complete_ondisk;
|
||||
} else {
|
||||
out << ", mlcod=" << min_last_complete_ondisk;
|
||||
}
|
||||
}
|
||||
out << ")";
|
||||
}
|
||||
|
@ -960,6 +960,7 @@ void ECBackend::handle_sub_write(
|
||||
op.updated_hit_set_history,
|
||||
op.trim_to,
|
||||
op.roll_forward_to,
|
||||
op.roll_forward_to,
|
||||
!op.backfill_or_async_recovery,
|
||||
localt,
|
||||
async);
|
||||
@ -1485,7 +1486,7 @@ void ECBackend::submit_transaction(
|
||||
const eversion_t &at_version,
|
||||
PGTransactionUPtr &&t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &roll_forward_to,
|
||||
const eversion_t &min_last_complete_ondisk,
|
||||
const vector<pg_log_entry_t> &log_entries,
|
||||
std::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_all_commit,
|
||||
@ -1500,7 +1501,7 @@ void ECBackend::submit_transaction(
|
||||
op->delta_stats = delta_stats;
|
||||
op->version = at_version;
|
||||
op->trim_to = trim_to;
|
||||
op->roll_forward_to = std::max(roll_forward_to, committed_to);
|
||||
op->roll_forward_to = std::max(min_last_complete_ondisk, committed_to);
|
||||
op->log_entries = log_entries;
|
||||
std::swap(op->updated_hit_set_history, hset_history);
|
||||
op->on_all_commit = on_all_commit;
|
||||
|
@ -102,7 +102,7 @@ public:
|
||||
const eversion_t &at_version,
|
||||
PGTransactionUPtr &&t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &roll_forward_to,
|
||||
const eversion_t &min_last_complete_ondisk,
|
||||
const vector<pg_log_entry_t> &log_entries,
|
||||
std::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_all_commit,
|
||||
|
@ -3449,6 +3449,19 @@ bool PG::can_discard_op(OpRequestRef& op)
|
||||
return true;
|
||||
}
|
||||
|
||||
if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
|
||||
CEPH_OSD_FLAG_LOCALIZE_READS)) &&
|
||||
!is_primary() &&
|
||||
m->get_map_epoch() < info.history.same_interval_since) {
|
||||
// Note: the Objecter will resend on interval change without the primary
|
||||
// changing if it actually sent to a replica. If the primary hasn't
|
||||
// changed since the send epoch, we got it, and we're primary, it won't
|
||||
// have resent even if the interval did change as it sent it to the primary
|
||||
// (us).
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
|
||||
// >= luminous client
|
||||
if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
|
||||
|
@ -235,6 +235,7 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
|
||||
const std::optional<pg_hit_set_history_t> &hset_history,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &roll_forward_to,
|
||||
const eversion_t &min_last_complete_ondisk,
|
||||
bool transaction_applied,
|
||||
ObjectStore::Transaction &t,
|
||||
bool async = false) = 0;
|
||||
@ -448,7 +449,8 @@ typedef std::shared_ptr<const OSDMap> OSDMapRef;
|
||||
const eversion_t &at_version, ///< [in] version
|
||||
PGTransactionUPtr &&t, ///< [in] trans to execute (move)
|
||||
const eversion_t &trim_to, ///< [in] trim log to here
|
||||
const eversion_t &roll_forward_to, ///< [in] trim rollback info to here
|
||||
const eversion_t &min_last_complete_ondisk, ///< [in] lower bound on
|
||||
/// committed version
|
||||
const vector<pg_log_entry_t> &log_entries, ///< [in] log entries for t
|
||||
/// [in] hitset history (if updated with this transaction)
|
||||
std::optional<pg_hit_set_history_t> &hset_history,
|
||||
|
@ -329,6 +329,16 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
bool has_write_since(const hobject_t &oid, const eversion_t &bound) const {
|
||||
for (auto i = log.rbegin(); i != log.rend(); ++i) {
|
||||
if (i->version <= bound)
|
||||
return false;
|
||||
if (i->soid.get_head() == oid.get_head())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// get a (bounded) list of recent reqids for the given object
|
||||
void get_object_reqids(const hobject_t& oid, unsigned max,
|
||||
mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls,
|
||||
|
@ -2334,9 +2334,9 @@ void PeeringState::activate(
|
||||
|
||||
auto &missing = pg_log.get_missing();
|
||||
|
||||
min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
|
||||
if (is_primary()) {
|
||||
last_update_ondisk = info.last_update;
|
||||
min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
|
||||
}
|
||||
last_update_applied = info.last_update;
|
||||
last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to();
|
||||
@ -3773,6 +3773,7 @@ void PeeringState::append_log(
|
||||
const vector<pg_log_entry_t>& logv,
|
||||
eversion_t trim_to,
|
||||
eversion_t roll_forward_to,
|
||||
eversion_t mlcod,
|
||||
ObjectStore::Transaction &t,
|
||||
bool transaction_applied,
|
||||
bool async)
|
||||
@ -3836,6 +3837,9 @@ void PeeringState::append_log(
|
||||
// update the local pg, pg log
|
||||
dirty_info = true;
|
||||
write_if_dirty(t);
|
||||
|
||||
if (!is_primary())
|
||||
min_last_complete_ondisk = mlcod;
|
||||
}
|
||||
|
||||
void PeeringState::recover_got(
|
||||
@ -6052,6 +6056,8 @@ void PeeringState::ReplicaActive::exit()
|
||||
pl->cancel_remote_recovery_reservation();
|
||||
utime_t dur = ceph_clock_now() - enter_time;
|
||||
pl->get_peering_perf().tinc(rs_replicaactive_latency, dur);
|
||||
|
||||
ps->min_last_complete_ondisk = eversion_t();
|
||||
}
|
||||
|
||||
/*-------Stray---*/
|
||||
@ -7004,9 +7010,7 @@ ostream &operator<<(ostream &out, const PeeringState &ps) {
|
||||
if (ps.last_complete_ondisk != ps.info.last_complete)
|
||||
out << " lcod " << ps.last_complete_ondisk;
|
||||
|
||||
if (ps.is_primary()) {
|
||||
out << " mlcod " << ps.min_last_complete_ondisk;
|
||||
}
|
||||
out << " mlcod " << ps.min_last_complete_ondisk;
|
||||
|
||||
out << " " << pg_state_string(ps.get_state());
|
||||
if (ps.should_send_notify())
|
||||
|
@ -1770,6 +1770,7 @@ public:
|
||||
const vector<pg_log_entry_t>& logv,
|
||||
eversion_t trim_to,
|
||||
eversion_t roll_forward_to,
|
||||
eversion_t min_last_complete_ondisk,
|
||||
ObjectStore::Transaction &t,
|
||||
bool transaction_applied,
|
||||
bool async);
|
||||
@ -2198,6 +2199,15 @@ public:
|
||||
bool needs_recovery() const;
|
||||
bool needs_backfill() const;
|
||||
|
||||
/**
|
||||
* Returns whether a particular object can be safely read on this replica
|
||||
*/
|
||||
bool can_serve_replica_read(const hobject_t &hoid) {
|
||||
ceph_assert(!is_primary());
|
||||
return !pg_log.get_log().has_write_since(
|
||||
hoid, get_min_last_complete_ondisk());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether all peers which might have unfound objects have been
|
||||
* queried or marked lost.
|
||||
|
@ -491,6 +491,20 @@ void PrimaryLogPG::schedule_recovery_work(
|
||||
osd->queue_recovery_context(this, c);
|
||||
}
|
||||
|
||||
void PrimaryLogPG::replica_clear_repop_obc(
|
||||
const vector<pg_log_entry_t> &logv,
|
||||
ObjectStore::Transaction &t)
|
||||
{
|
||||
for (auto &&e: logv) {
|
||||
/* Have to blast all clones, they share a snapset */
|
||||
object_contexts.clear_range(
|
||||
e.soid.get_object_boundary(), e.soid.get_head());
|
||||
ceph_assert(
|
||||
snapset_contexts.find(e.soid.get_head()) ==
|
||||
snapset_contexts.end());
|
||||
}
|
||||
}
|
||||
|
||||
bool PrimaryLogPG::should_send_op(
|
||||
pg_shard_t peer,
|
||||
const hobject_t &hoid) {
|
||||
@ -2112,6 +2126,19 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
|
||||
return;
|
||||
}
|
||||
|
||||
if (!is_primary()) {
|
||||
if (!recovery_state.can_serve_replica_read(oid)) {
|
||||
dout(20) << __func__ << ": oid " << oid
|
||||
<< " unstable write on replica, bouncing to primary."
|
||||
<< *m << dendl;
|
||||
osd->reply_op_error(op, -EAGAIN);
|
||||
return;
|
||||
} else {
|
||||
dout(20) << __func__ << ": serving replica read on oid" << oid
|
||||
<< dendl;
|
||||
}
|
||||
}
|
||||
|
||||
int r = find_object_context(
|
||||
oid, &obc, can_create,
|
||||
m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
|
||||
@ -11239,14 +11266,19 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
|
||||
if (pmissing)
|
||||
*pmissing = soid;
|
||||
put_snapset_context(ssc);
|
||||
if (is_degraded_or_backfilling_object(soid)) {
|
||||
dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
|
||||
return -EAGAIN;
|
||||
} else if (is_degraded_on_async_recovery_target(soid)) {
|
||||
dout(20) << __func__ << " clone is recovering " << soid << dendl;
|
||||
return -EAGAIN;
|
||||
if (is_primary()) {
|
||||
if (is_degraded_or_backfilling_object(soid)) {
|
||||
dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
|
||||
return -EAGAIN;
|
||||
} else if (is_degraded_on_async_recovery_target(soid)) {
|
||||
dout(20) << __func__ << " clone is recovering " << soid << dendl;
|
||||
return -EAGAIN;
|
||||
} else {
|
||||
dout(20) << __func__ << " missing clone " << soid << dendl;
|
||||
return -ENOENT;
|
||||
}
|
||||
} else {
|
||||
dout(20) << __func__ << " missing clone " << soid << dendl;
|
||||
dout(20) << __func__ << " replica missing clone" << soid << dendl;
|
||||
return -ENOENT;
|
||||
}
|
||||
}
|
||||
|
@ -459,6 +459,7 @@ public:
|
||||
const std::optional<pg_hit_set_history_t> &hset_history,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &roll_forward_to,
|
||||
const eversion_t &min_last_complete_ondisk,
|
||||
bool transaction_applied,
|
||||
ObjectStore::Transaction &t,
|
||||
bool async = false) override {
|
||||
@ -473,10 +474,18 @@ public:
|
||||
projected_log.skip_can_rollback_to_to_head();
|
||||
projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
|
||||
}
|
||||
if (!is_primary() && !is_ec_pg()) {
|
||||
replica_clear_repop_obc(logv, t);
|
||||
}
|
||||
recovery_state.append_log(
|
||||
logv, trim_to, roll_forward_to, t, transaction_applied, async);
|
||||
logv, trim_to, roll_forward_to, min_last_complete_ondisk,
|
||||
t, transaction_applied, async);
|
||||
}
|
||||
|
||||
void replica_clear_repop_obc(
|
||||
const vector<pg_log_entry_t> &logv,
|
||||
ObjectStore::Transaction &t);
|
||||
|
||||
void op_applied(const eversion_t &applied_version) override;
|
||||
|
||||
bool should_send_op(
|
||||
|
@ -450,7 +450,7 @@ void ReplicatedBackend::submit_transaction(
|
||||
const eversion_t &at_version,
|
||||
PGTransactionUPtr &&_t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &roll_forward_to,
|
||||
const eversion_t &min_last_complete_ondisk,
|
||||
const vector<pg_log_entry_t> &_log_entries,
|
||||
std::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_all_commit,
|
||||
@ -498,7 +498,7 @@ void ReplicatedBackend::submit_transaction(
|
||||
tid,
|
||||
reqid,
|
||||
trim_to,
|
||||
at_version,
|
||||
min_last_complete_ondisk,
|
||||
added.size() ? *(added.begin()) : hobject_t(),
|
||||
removed.size() ? *(removed.begin()) : hobject_t(),
|
||||
log_entries,
|
||||
@ -514,6 +514,7 @@ void ReplicatedBackend::submit_transaction(
|
||||
hset_history,
|
||||
trim_to,
|
||||
at_version,
|
||||
min_last_complete_ondisk,
|
||||
true,
|
||||
op_t);
|
||||
|
||||
@ -920,7 +921,7 @@ Message * ReplicatedBackend::generate_subop(
|
||||
ceph_tid_t tid,
|
||||
osd_reqid_t reqid,
|
||||
eversion_t pg_trim_to,
|
||||
eversion_t pg_roll_forward_to,
|
||||
eversion_t min_last_complete_ondisk,
|
||||
hobject_t new_temp_oid,
|
||||
hobject_t discard_temp_oid,
|
||||
const bufferlist &log_entries,
|
||||
@ -956,7 +957,14 @@ Message * ReplicatedBackend::generate_subop(
|
||||
wr->pg_stats = get_info().stats;
|
||||
|
||||
wr->pg_trim_to = pg_trim_to;
|
||||
wr->pg_roll_forward_to = pg_roll_forward_to;
|
||||
|
||||
if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) {
|
||||
wr->min_last_complete_ondisk = min_last_complete_ondisk;
|
||||
} else {
|
||||
/* Some replicas need this field to be at_version. New replicas
|
||||
* will ignore it */
|
||||
wr->set_rollback_to(at_version);
|
||||
}
|
||||
|
||||
wr->new_temp_oid = new_temp_oid;
|
||||
wr->discard_temp_oid = discard_temp_oid;
|
||||
@ -970,7 +978,7 @@ void ReplicatedBackend::issue_op(
|
||||
ceph_tid_t tid,
|
||||
osd_reqid_t reqid,
|
||||
eversion_t pg_trim_to,
|
||||
eversion_t pg_roll_forward_to,
|
||||
eversion_t min_last_complete_ondisk,
|
||||
hobject_t new_temp_oid,
|
||||
hobject_t discard_temp_oid,
|
||||
const vector<pg_log_entry_t> &log_entries,
|
||||
@ -1003,7 +1011,7 @@ void ReplicatedBackend::issue_op(
|
||||
tid,
|
||||
reqid,
|
||||
pg_trim_to,
|
||||
pg_roll_forward_to,
|
||||
min_last_complete_ondisk,
|
||||
new_temp_oid,
|
||||
discard_temp_oid,
|
||||
logs,
|
||||
@ -1103,7 +1111,8 @@ void ReplicatedBackend::do_repop(OpRequestRef op)
|
||||
log,
|
||||
m->updated_hit_set_history,
|
||||
m->pg_trim_to,
|
||||
m->pg_roll_forward_to,
|
||||
m->version, /* Replicated PGs don't have rollback info */
|
||||
m->min_last_complete_ondisk,
|
||||
update_snaps,
|
||||
rm->localt,
|
||||
async);
|
||||
|
@ -365,7 +365,7 @@ public:
|
||||
const eversion_t &at_version,
|
||||
PGTransactionUPtr &&t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &roll_forward_to,
|
||||
const eversion_t &min_last_complete_ondisk,
|
||||
const vector<pg_log_entry_t> &log_entries,
|
||||
std::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_all_commit,
|
||||
@ -381,7 +381,7 @@ private:
|
||||
ceph_tid_t tid,
|
||||
osd_reqid_t reqid,
|
||||
eversion_t pg_trim_to,
|
||||
eversion_t pg_roll_forward_to,
|
||||
eversion_t min_last_complete_ondisk,
|
||||
hobject_t new_temp_oid,
|
||||
hobject_t discard_temp_oid,
|
||||
const bufferlist &log_entries,
|
||||
@ -395,7 +395,7 @@ private:
|
||||
ceph_tid_t tid,
|
||||
osd_reqid_t reqid,
|
||||
eversion_t pg_trim_to,
|
||||
eversion_t pg_roll_forward_to,
|
||||
eversion_t min_last_complete_ondisk,
|
||||
hobject_t new_temp_oid,
|
||||
hobject_t discard_temp_oid,
|
||||
const vector<pg_log_entry_t> &log_entries,
|
||||
|
@ -1204,6 +1204,7 @@ public:
|
||||
ObjectDesc old_value;
|
||||
int snap;
|
||||
bool balance_reads;
|
||||
bool localize_reads;
|
||||
|
||||
std::shared_ptr<int> in_use;
|
||||
|
||||
@ -1230,12 +1231,14 @@ public:
|
||||
RadosTestContext *context,
|
||||
const string &oid,
|
||||
bool balance_reads,
|
||||
bool localize_reads,
|
||||
TestOpStat *stat = 0)
|
||||
: TestOp(n, context, stat),
|
||||
completions(3),
|
||||
oid(oid),
|
||||
snap(0),
|
||||
balance_reads(balance_reads),
|
||||
localize_reads(localize_reads),
|
||||
results(3),
|
||||
retvals(3),
|
||||
extent_results(3),
|
||||
@ -1339,6 +1342,8 @@ public:
|
||||
unsigned flags = 0;
|
||||
if (balance_reads)
|
||||
flags |= librados::OPERATION_BALANCE_READS;
|
||||
if (localize_reads)
|
||||
flags |= librados::OPERATION_LOCALIZE_READS;
|
||||
|
||||
ceph_assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op,
|
||||
flags, NULL));
|
||||
@ -1985,6 +1990,7 @@ public:
|
||||
ObjectDesc tgt_value;
|
||||
int snap;
|
||||
bool balance_reads;
|
||||
bool localize_reads;
|
||||
|
||||
std::shared_ptr<int> in_use;
|
||||
|
||||
@ -2006,12 +2012,14 @@ public:
|
||||
const string &oid,
|
||||
const string &tgt_pool_name,
|
||||
bool balance_reads,
|
||||
bool localize_reads,
|
||||
TestOpStat *stat = 0)
|
||||
: TestOp(n, context, stat),
|
||||
completions(2),
|
||||
oid(oid),
|
||||
snap(0),
|
||||
balance_reads(balance_reads),
|
||||
localize_reads(localize_reads),
|
||||
results(2),
|
||||
retvals(2),
|
||||
waiting_on(0),
|
||||
@ -2106,6 +2114,8 @@ public:
|
||||
unsigned flags = 0;
|
||||
if (balance_reads)
|
||||
flags |= librados::OPERATION_BALANCE_READS;
|
||||
if (localize_reads)
|
||||
flags |= librados::OPERATION_LOCALIZE_READS;
|
||||
|
||||
ceph_assert(!context->io_ctx.aio_operate(context->prefix+oid, completions[0], &op,
|
||||
flags, NULL));
|
||||
|
@ -29,6 +29,7 @@ public:
|
||||
int max_seconds,
|
||||
bool ec_pool,
|
||||
bool balance_reads,
|
||||
bool localize_reads,
|
||||
bool set_redirect,
|
||||
bool set_chunk,
|
||||
bool enable_dedup) :
|
||||
@ -37,6 +38,7 @@ public:
|
||||
m_total_weight(0),
|
||||
m_ec_pool(ec_pool),
|
||||
m_balance_reads(balance_reads),
|
||||
m_localize_reads(localize_reads),
|
||||
m_set_redirect(set_redirect),
|
||||
m_set_chunk(set_chunk),
|
||||
m_enable_dedup(enable_dedup)
|
||||
@ -282,7 +284,8 @@ private:
|
||||
switch (type) {
|
||||
case TEST_OP_READ:
|
||||
oid = *(rand_choose(context.oid_not_in_use));
|
||||
return new ReadOp(m_op, &context, oid, m_balance_reads, m_stats);
|
||||
return new ReadOp(m_op, &context, oid, m_balance_reads, m_localize_reads,
|
||||
m_stats);
|
||||
|
||||
case TEST_OP_WRITE:
|
||||
oid = *(rand_choose(context.oid_not_in_use));
|
||||
@ -455,6 +458,7 @@ private:
|
||||
unsigned int m_total_weight;
|
||||
bool m_ec_pool;
|
||||
bool m_balance_reads;
|
||||
bool m_localize_reads;
|
||||
bool m_set_redirect;
|
||||
bool m_set_chunk;
|
||||
bool m_enable_dedup;
|
||||
@ -511,6 +515,7 @@ int main(int argc, char **argv)
|
||||
bool no_omap = false;
|
||||
bool no_sparse = false;
|
||||
bool balance_reads = false;
|
||||
bool localize_reads = false;
|
||||
bool set_redirect = false;
|
||||
bool set_chunk = false;
|
||||
bool enable_dedup = false;
|
||||
@ -536,8 +541,10 @@ int main(int argc, char **argv)
|
||||
no_omap = true;
|
||||
else if (strcmp(argv[i], "--no-sparse") == 0)
|
||||
no_sparse = true;
|
||||
else if (strcmp(argv[i], "--balance_reads") == 0)
|
||||
else if (strcmp(argv[i], "--balance-reads") == 0)
|
||||
balance_reads = true;
|
||||
else if (strcmp(argv[i], "--localize-reads") == 0)
|
||||
localize_reads = true;
|
||||
else if (strcmp(argv[i], "--pool-snaps") == 0)
|
||||
pool_snaps = true;
|
||||
else if (strcmp(argv[i], "--write-fadvise-dontneed") == 0)
|
||||
@ -665,7 +672,8 @@ int main(int argc, char **argv)
|
||||
WeightedTestGenerator gen = WeightedTestGenerator(
|
||||
ops, objects,
|
||||
op_weights, &stats, max_seconds,
|
||||
ec_pool, balance_reads, set_redirect, set_chunk, enable_dedup);
|
||||
ec_pool, balance_reads, localize_reads,
|
||||
set_redirect, set_chunk, enable_dedup);
|
||||
int r = context.init();
|
||||
if (r < 0) {
|
||||
cerr << "Error initializing rados test context: "
|
||||
|
Loading…
Reference in New Issue
Block a user