1
0
mirror of https://github.com/ceph/ceph synced 2025-03-31 07:53:23 +00:00

Merge branch 'master' of github.com:NewDreamNetwork/ceph

This commit is contained in:
Greg Farnum 2011-10-10 10:55:10 -07:00
commit 968b09095d
20 changed files with 337 additions and 113 deletions

View File

@ -21,10 +21,16 @@ Group: System Environment/Base
URL: http://ceph.newdream.net/
Source: http://ceph.newdream.net/download/%{name}-%{version}.tar.gz
BuildRequires: fuse-devel, libtool, libtool-ltdl-devel, boost-devel,
BuildRequires: gcc-c++,
BuildRequires: fuse-devel, libtool, boost-devel,
%if %{defined suse_version}
BuildRequires: mozilla-nss-devel, libatomic-ops-devel, keyutils-devel, libtool,
%else
BuildRequires: nss-devel, libatomic_ops-devel, keyutils-libs-devel,
BuildRequires: libtool-ltdl-devel,
%endif
BuildRequires: libedit-devel, fuse-devel, git, perl, gdbm,
BuildRequires: nss-devel, libatomic_ops-devel
BuildRequires: pkgconfig, python, keyutils-libs-devel
BuildRequires: pkgconfig, python
%if %{with tcmalloc}
# use isa so this will not be satisfied by
# google-perftools-devel.i686 on a x86_64 box
@ -32,8 +38,14 @@ BuildRequires: pkgconfig, python, keyutils-libs-devel
BuildRequires: google-perftools-devel%{?_isa}
%endif
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
Requires(post): chkconfig, binutils, libedit
%if %{defined suse_version}
Requires(post): aaa_base
Requires(preun): aaa_base
%else
Requires(post): chkconfig
Requires(preun): chkconfig
%endif
Requires(post): binutils, libedit
Requires(preun): initscripts
%description
@ -116,6 +128,7 @@ MY_CONF_OPT="$MY_CONF_OPT --without-gtk2"
# default differs from what's needed for rpm
%{configure} --prefix=/usr --sbindir=/sbin \
--localstatedir=/var --sysconfdir=/etc \
--docdir=%{_docdir}/ceph \
--without-hadoop $MY_CONF_OPT \
%{?with_tcmalloc:--with-tcmalloc} %{!?with_tcmalloc:--without-tcmalloc}
@ -127,8 +140,6 @@ make install DESTDIR=$RPM_BUILD_ROOT
find $RPM_BUILD_ROOT -type f -name "*.la" -exec rm -f {} ';'
find $RPM_BUILD_ROOT -type f -name "*.a" -exec rm -f {} ';'
install -D src/init-ceph $RPM_BUILD_ROOT%{_initrddir}/ceph
install -D src/init-radosgw $RPM_BUILD_ROOT%{_initrddir}/radosgw
chmod 0644 $RPM_BUILD_ROOT%{_docdir}/ceph/sample.ceph.conf
install -m 0644 -D src/logrotate.conf $RPM_BUILD_ROOT%{_sysconfdir}/logrotate.d/ceph
mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/lib/ceph/tmp/
mkdir -p $RPM_BUILD_ROOT%{_localstatedir}/log/ceph/
@ -156,7 +167,7 @@ fi
%files
%defattr(-,root,root,-)
%doc README COPYING
%doc README COPYING src/sample.ceph.conf src/sample.fetch_config
%{_bindir}/ceph
%{_bindir}/cephfs
%{_bindir}/ceph-conf
@ -186,8 +197,6 @@ fi
/sbin/mkcephfs
/sbin/mount.ceph
%{_libdir}/ceph
%{_docdir}/ceph/sample.ceph.conf
%{_docdir}/ceph/sample.fetch_config
%{_sysconfdir}/bash_completion.d/ceph
%{_sysconfdir}/bash_completion.d/rados
%{_sysconfdir}/bash_completion.d/radosgw-admin

View File

@ -724,7 +724,9 @@ EXTRA_DIST = $(srcdir)/verify-mds-journal.sh $(srcdir)/vstart.sh $(srcdir)/stop.
$(ceph_tool_gui_DATA)
# work around old versions of automake that don't define $docdir
docdir = ${datadir}/doc/ceph
# NOTE: this won't work on suse, where docdir is /usr/share/doc/packages/$package.
docdir ?= ${datadir}/doc/ceph
doc_DATA = $(srcdir)/sample.ceph.conf
doc_SCRIPTS = sample.fetch_config

View File

@ -19,30 +19,33 @@
#include "AuthSupported.h"
#include "common/Mutex.h"
static bool _initialized = false;
static Mutex _lock("auth_service_handler_init");
static map<int, AuthAuthorizeHandler *> authorizers;
static void _init_authorizers(CephContext *cct)
AuthAuthorizeHandler *AuthAuthorizeHandlerRegistry::get_handler(int protocol)
{
if (is_supported_auth(CEPH_AUTH_NONE, cct)) {
authorizers[CEPH_AUTH_NONE] = new AuthNoneAuthorizeHandler();
if (!is_supported_auth(protocol, cct)) {
return NULL;
}
if (is_supported_auth(CEPH_AUTH_CEPHX, cct)) {
authorizers[CEPH_AUTH_CEPHX] = new CephxAuthorizeHandler();
}
_initialized = true;
}
AuthAuthorizeHandler *get_authorize_handler(int protocol, CephContext *cct)
{
Mutex::Locker l(_lock);
if (!_initialized) {
_init_authorizers(cct);
}
map<int, AuthAuthorizeHandler *>::iterator iter = authorizers.find(protocol);
if (iter != authorizers.end())
Mutex::Locker l(m_lock);
map<int,AuthAuthorizeHandler*>::iterator iter = m_authorizers.find(protocol);
if (iter != m_authorizers.end())
return iter->second;
switch (protocol) {
case CEPH_AUTH_NONE:
m_authorizers[protocol] = new AuthNoneAuthorizeHandler();
return m_authorizers[protocol];
case CEPH_AUTH_CEPHX:
m_authorizers[protocol] = new CephxAuthorizeHandler();
return m_authorizers[protocol];
}
return NULL;
}
AuthAuthorizeHandlerRegistry::~AuthAuthorizeHandlerRegistry()
{
for (map<int,AuthAuthorizeHandler*>::iterator iter = m_authorizers.begin();
iter != m_authorizers.end();
++iter)
delete iter->second;
}

View File

@ -30,6 +30,18 @@ struct AuthAuthorizeHandler {
AuthCapsInfo& caps_info, uint64_t *auid = NULL) = 0;
};
extern AuthAuthorizeHandler *get_authorize_handler(int protocol, CephContext *cct);
class AuthAuthorizeHandlerRegistry {
Mutex m_lock;
map<int,AuthAuthorizeHandler*> m_authorizers;
CephContext *cct;
public:
AuthAuthorizeHandlerRegistry(CephContext *cct_)
: m_lock("AuthAuthorizeHandlerRegistry::m_lock"), cct(cct_)
{}
~AuthAuthorizeHandlerRegistry();
AuthAuthorizeHandler *get_handler(int protocol);
};
#endif

View File

@ -415,7 +415,7 @@ reject:
if (collide && flocal < 3)
/* retry locally a few times */
retry_bucket = 1;
else if (flocal < in->size + orig_tries)
else if (flocal <= in->size + orig_tries)
/* exhaustive bucket search */
retry_bucket = 1;
else if (ftotal < 20)

View File

@ -833,6 +833,7 @@ int main(int argc, const char **argv)
int num_rep = 2;
int min_x = 0, max_x = 10000-1;
int min_rule = 0, max_rule = 1000;
int force = -1;
map<int, int> device_weight;
vector<const char *> empty_args; // we use -c, don't confuse the generic arg parsing
@ -918,6 +919,11 @@ int main(int argc, const char **argv)
exit(EXIT_FAILURE);
}
max_x = min_x;
} else if (ceph_argparse_withint(args, i, &force, &err, "--force", (char*)NULL)) {
if (!err.str().empty()) {
cerr << err.str() << std::endl;
exit(EXIT_FAILURE);
}
} else if (ceph_argparse_withint(args, i, &max_rule, &err, "--max_rule", (char*)NULL)) {
if (!err.str().empty()) {
cerr << err.str() << std::endl;
@ -943,6 +949,7 @@ int main(int argc, const char **argv)
if (i == args.end())
usage();
float f = atof(*i);
i = args.erase(i);
int w = (int)(f * 0x10000);
if (w < 0)
w = 0;
@ -1222,7 +1229,7 @@ int main(int argc, const char **argv)
map<int,int> sizes;
for (int x = min_x; x <= max_x; x++) {
vector<int> out;
crush.do_rule(r, x, out, num_rep, -1, weight);
crush.do_rule(r, x, out, num_rep, force, weight);
if (verbose)
cout << "rule " << r << " x " << x << " " << out << std::endl;
for (unsigned i = 0; i < out.size(); i++)

View File

@ -1107,13 +1107,12 @@ struct CopyProgressCtx {
int do_copy_extent(uint64_t offset, size_t len, const char *buf, void *data)
{
CopyProgressCtx *cp = reinterpret_cast<CopyProgressCtx*>(data);
cp->prog_ctx.update_progress(offset, cp->src_size);
int ret = 0;
if (buf) {
int ret = write(cp->destictx, offset, len, buf);
if (ret) {
return ret;
}
ret = write(cp->destictx, offset, len, buf);
}
return cp->prog_ctx.update_progress(offset, cp->src_size);
return ret;
}
ProgressContext::~ProgressContext()
@ -1153,11 +1152,11 @@ int copy(ImageCtx& ictx, IoCtx& dest_md_ctx, const char *destname,
{
CephContext *cct = dest_md_ctx.cct();
CopyProgressCtx cp(prog_ctx);
uint64_t src_size = ictx.get_image_size();
int64_t r;
int order = ictx.header.options.order;
int r = create(dest_md_ctx, destname, src_size, &order);
r = create(dest_md_ctx, destname, src_size, &order);
if (r < 0) {
lderr(cct) << "header creation failed" << dendl;
return r;

View File

@ -94,6 +94,7 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
mds_lock("MDS::mds_lock"),
timer(m->cct, mds_lock),
authorize_handler_registry(new AuthAuthorizeHandlerRegistry(m->cct)),
name(n),
whoami(-1), incarnation(0),
standby_for_rank(MDSMap::MDS_NO_STANDBY_PREF),
@ -155,6 +156,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
MDS::~MDS() {
Mutex::Locker lock(mds_lock);
delete authorize_handler_registry;
if (mdcache) { delete mdcache; mdcache = NULL; }
if (mdlog) { delete mdlog; mdlog = NULL; }
if (balancer) { delete balancer; balancer = NULL; }
@ -2027,7 +2030,7 @@ bool MDS::ms_verify_authorizer(Connection *con, int peer_type,
Mutex::Locker l(mds_lock);
AuthAuthorizeHandler *authorize_handler =
get_authorize_handler(protocol, g_ceph_context);
authorize_handler_registry->get_handler(protocol);
if (!authorize_handler) {
dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
is_valid = false;

View File

@ -137,11 +137,15 @@ class AnchorClient;
class MDSTableServer;
class MDSTableClient;
class AuthAuthorizeHandlerRegistry;
class MDS : public Dispatcher {
public:
Mutex mds_lock;
SafeTimer timer;
AuthAuthorizeHandlerRegistry *authorize_handler_registry;
string name;
int whoami;
int incarnation;

View File

@ -222,6 +222,10 @@ public:
pipe->put();
pipe = p->get();
}
bool is_connected() {
Mutex::Locker l(lock);
return pipe != NULL;
}
int get_peer_type() { return peer_type; }
void set_peer_type(int t) { peer_type = t; }

View File

@ -514,6 +514,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
Dispatcher(external_messenger->cct),
osd_lock("OSD::osd_lock"),
timer(external_messenger->cct, osd_lock),
authorize_handler_registry(new AuthAuthorizeHandlerRegistry(external_messenger->cct)),
cluster_messenger(internal_messenger),
client_messenger(external_messenger),
monc(mc),
@ -575,6 +576,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
OSD::~OSD()
{
delete authorize_handler_registry;
delete map_in_progress_cond;
delete class_handler;
g_ceph_context->GetPerfCountersCollection()->logger_remove(logger);
@ -1890,6 +1892,7 @@ void OSD::complete_notify(void *_notif, void *_obc)
MWatchNotify *reply = notif->reply;
client_messenger->send_message(reply, notif->session->con);
notif->session->put();
notif->session->con->put();
watch->remove_notification(notif);
if (notif->timeout)
watch_timer.cancel_event(notif->timeout);
@ -1910,15 +1913,19 @@ void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc, Replic
}
}
bool OSD::ms_handle_reset(Connection *con)
void OSD::handle_watch_timeout(void *obc,
ReplicatedPG *pg,
entity_name_t entity,
utime_t expire)
{
dout(0) << "OSD::ms_handle_reset()" << dendl;
OSD::Session *session = (OSD::Session *)con->get_priv();
if (!session)
return false;
dout(0) << "OSD::ms_handle_reset() s=" << (void *)session << dendl;
pg->lock();
pg->handle_watch_timeout(obc, entity, expire);
pg->unlock();
pg->put();
}
void OSD::disconnect_session_watches(Session *session)
{
// get any watched obc's
map<ReplicatedPG::ObjectContext *, pg_t> obcs;
watch_lock.Lock();
@ -1932,6 +1939,8 @@ bool OSD::ms_handle_reset(Connection *con)
ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first;
dout(0) << "obc=" << (void *)obc << dendl;
ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second));
assert(pg);
obc->lock.Lock();
watch_lock.Lock();
/* NOTE! fix this one, should be able to just lookup entity name,
@ -1946,10 +1955,11 @@ bool OSD::ms_handle_reset(Connection *con)
watch_info_t& w = obc->obs.oi.watchers[entity];
utime_t expire = ceph_clock_now(g_ceph_context);
expire += w.timeout_seconds;
obc->unconnected_watchers[entity] = expire;
pg->register_unconnected_watcher(obc, entity, expire);
dout(10) << " disconnected watch " << w << " by " << entity << " session " << session
<< ", expires " << expire << dendl;
obc->watchers.erase(witer++);
session->put();
}
if (witer == obc->watchers.end())
break;
@ -1957,27 +1967,20 @@ bool OSD::ms_handle_reset(Connection *con)
}
watch_lock.Unlock();
obc->lock.Unlock();
pg->put_object_context(obc);
/* now drop a reference to that obc */
put_object_context(obc, oiter->second);
pg->unlock();
}
}
#if 0
// FIXME: do we really want to _cancel_ notifications here?
// shouldn't they time out in the usual way? because this person
// might/should immediately reconnect...
watch_lock.Lock();
for (map<void *, entity_name_t>::iterator notif_iter = session->notifs.begin();
notif_iter != session->notifs.end();
++notif_iter) {
Watch::Notification *notif = (Watch::Notification *)notif_iter->first;
entity_name_t& dest = notif_iter->second;
dout(0) << "ms_handle_reset: ack notification for notif=" << (void *)notif << " entity=" << dest << dendl;
ack_notification(dest, notif);
}
session->notifs.clear();
watch_lock.Unlock();
#endif
bool OSD::ms_handle_reset(Connection *con)
{
dout(0) << "OSD::ms_handle_reset()" << dendl;
OSD::Session *session = (OSD::Session *)con->get_priv();
if (!session)
return false;
dout(0) << "OSD::ms_handle_reset() s=" << (void *)session << dendl;
disconnect_session_watches(session);
session->put();
return true;
}
@ -2595,8 +2598,7 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& isvalid)
{
AuthAuthorizeHandler *authorize_handler =
get_authorize_handler(protocol, g_ceph_context);
AuthAuthorizeHandler *authorize_handler = authorize_handler_registry->get_handler(protocol);
if (!authorize_handler) {
dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
isvalid = false;
@ -5005,6 +5007,13 @@ void OSD::handle_misdirected_op(PG *pg, MOSDOp *op)
void OSD::handle_op(MOSDOp *op)
{
if (!op->get_connection()->is_connected()) {
dout(10) << "handle_op sender " << op->get_connection()->get_peer_addr()
<< " not connected, dropping " << *op << dendl;
op->put();
return;
}
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch()))
return;
@ -5322,15 +5331,21 @@ void OSD::dequeue_op(PG *pg)
}
osd_lock.Unlock();
// do it
if (op->get_type() == CEPH_MSG_OSD_OP)
pg->do_op((MOSDOp*)op); // do it now
else if (op->get_type() == MSG_OSD_SUBOP)
pg->do_sub_op((MOSDSubOp*)op);
else if (op->get_type() == MSG_OSD_SUBOPREPLY)
pg->do_sub_op_reply((MOSDSubOpReply*)op);
else
assert(0);
if (!op->get_connection()->is_connected()) {
dout(10) << "dequeue_op sender " << op->get_connection()->get_peer_addr()
<< " not connected, dropping " << *op << dendl;
op->put();
} else {
// do it
if (op->get_type() == CEPH_MSG_OSD_OP)
pg->do_op((MOSDOp*)op); // do it now
else if (op->get_type() == MSG_OSD_SUBOP)
pg->do_sub_op((MOSDSubOp*)op);
else if (op->get_type() == MSG_OSD_SUBOPREPLY)
pg->do_sub_op_reply((MOSDSubOpReply*)op);
else
assert(0);
}
// unlock and put pg
pg->unlock();

View File

@ -112,9 +112,10 @@ class MOSDPGMissing;
class Watch;
class Notification;
class ObjectContext;
class ReplicatedPG;
class AuthAuthorizeHandlerRegistry;
extern const coll_t meta_coll;
class OSD : public Dispatcher {
@ -123,6 +124,8 @@ protected:
Mutex osd_lock; // global lock
SafeTimer timer; // safe timer (osd_lock)
AuthAuthorizeHandlerRegistry *authorize_handler_registry;
Messenger *cluster_messenger;
Messenger *client_messenger;
MonClient *monc;
@ -1034,6 +1037,11 @@ public:
Mutex watch_lock;
SafeTimer watch_timer;
void handle_notify_timeout(void *notif);
void disconnect_session_watches(Session *session);
void handle_watch_timeout(void *obc,
ReplicatedPG *pg,
entity_name_t entity,
utime_t expire);
};
//compatibility of the executable

View File

@ -1685,6 +1685,8 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
if (!is_replay()) {
osd->take_waiters(waiting_for_active);
}
on_activate();
}

View File

@ -1664,8 +1664,18 @@ public:
virtual void on_osd_failure(int osd) = 0;
virtual void on_role_change() = 0;
virtual void on_change() = 0;
virtual void on_activate() = 0;
virtual void on_shutdown() = 0;
virtual void remove_watchers_and_notifies() = 0;
virtual void register_unconnected_watcher(void *obc,
entity_name_t entity,
utime_t expire) = 0;
virtual void unregister_unconnected_watcher(void *obc,
entity_name_t entity) = 0;
virtual void handle_watch_timeout(void *obc,
entity_name_t entity,
utime_t expire) = 0;
};
//WRITE_CLASS_ENCODER(PG::Info::History)

View File

@ -1128,6 +1128,7 @@ void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif)
assert(niter != obc->notifs.end());
niter->first->session->put();
niter->first->session->con->put();
obc->notifs.erase(niter);
put_object_context(obc);
@ -1145,12 +1146,20 @@ void ReplicatedPG::remove_watchers_and_notifies()
) {
map<hobject_t, ObjectContext *>::iterator iter = oiter++;
ObjectContext *obc = iter->second;
obc->ref++;
for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
witer != obc->watchers.end();
remove_watcher(obc, (witer++)->first));
for (map<entity_name_t, Context *>::iterator iter = obc->unconnected_watchers.begin();
iter != obc->unconnected_watchers.end();
) {
map<entity_name_t, Context *>::iterator i = iter++;
unregister_unconnected_watcher(obc, i->first);
}
for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin();
niter != obc->notifs.end();
remove_notify(obc, (niter++)->first));
put_object_context(obc);
}
osd->watch_lock.Unlock();
}
@ -2420,10 +2429,12 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
session->get();
session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
}
map<entity_name_t, utime_t>::iterator un_iter = obc->unconnected_watchers.find(entity);
if (un_iter != obc->unconnected_watchers.end())
obc->unconnected_watchers.erase(un_iter);
map<entity_name_t, Context *>::iterator un_iter =
obc->unconnected_watchers.find(entity);
if (un_iter != obc->unconnected_watchers.end()) {
unregister_unconnected_watcher(obc, un_iter->first);
}
map<Watch::Notification *, bool>::iterator niter;
for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
Watch::Notification *notif = niter->first;
@ -2442,7 +2453,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
remove_watcher(obc, entity);
} else {
assert(obc->unconnected_watchers.count(entity));
obc->unconnected_watchers.erase(entity);
unregister_unconnected_watcher(obc, entity);
}
}
@ -2454,34 +2465,32 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
session->get(); // notif got a reference
session->con->get();
notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
osd->watch->add_notification(notif);
// connected
for (map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.begin();
q != obc->watchers.end();
q++) {
entity_name_t name = q->first;
OSD::Session *s = q->second;
watch_info_t& w = obc->obs.oi.watchers[q->first];
for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin();
i != obc->obs.oi.watchers.end();
++i) {
map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first);
if (q != obc->watchers.end()) {
entity_name_t name = q->first;
OSD::Session *s = q->second;
watch_info_t& w = obc->obs.oi.watchers[q->first];
notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
s->add_notif(notif, name);
notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
s->add_notif(notif, name);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
osd->client_messenger->send_message(notify_msg, s->con);
}
// unconnected
utime_t now = ceph_clock_now(g_ceph_context);
for (map<entity_name_t, utime_t>::iterator q = obc->unconnected_watchers.begin();
q != obc->unconnected_watchers.end();
q++) {
entity_name_t name = q->first;
utime_t expire = q->second;
if (now < expire)
notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
osd->client_messenger->send_message(notify_msg, s->con);
} else {
// unconnected
utime_t now = ceph_clock_now(g_ceph_context);
entity_name_t name = i->first;
notif->add_watcher(name, Watch::WATCHER_PENDING);
}
}
notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
@ -3020,7 +3029,14 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
{
if (!is_active() || is_degraded_object(obc->obs.oi.soid) ||
is_missing_object(obc->obs.oi.soid))
return;
if (!obc->obs.oi.watchers.empty()) {
Mutex::Locker l(osd->watch_lock);
assert(obc->unconnected_watchers.size() == 0);
assert(obc->watchers.size() == 0);
// populate unconnected_watchers
utime_t now = ceph_clock_now(g_ceph_context);
for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin();
@ -3029,11 +3045,89 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
utime_t expire = now;
expire += p->second.timeout_seconds;
dout(10) << " unconnected watcher " << p->first << " will expire " << expire << dendl;
obc->unconnected_watchers[p->first] = expire;
register_unconnected_watcher(obc, p->first, expire);
}
}
}
void ReplicatedPG::unregister_unconnected_watcher(void *_obc,
entity_name_t entity)
{
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
osd->watch_timer.cancel_event(obc->unconnected_watchers[entity]);
obc->unconnected_watchers.erase(entity);
put_object_context(obc);
put();
}
void ReplicatedPG::register_unconnected_watcher(void *_obc,
entity_name_t entity,
utime_t expire)
{
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
pg_t pgid = info.pgid;
pgid.set_ps(obc->obs.oi.soid.hash);
get();
obc->ref++;
Context *cb = new Watch::C_WatchTimeout(osd,
static_cast<void *>(obc),
this,
entity, expire);
osd->watch_timer.add_event_at(expire, cb);
obc->unconnected_watchers[entity] = cb;
}
void ReplicatedPG::handle_watch_timeout(void *_obc,
entity_name_t entity,
utime_t expire)
{
ObjectContext *obc = static_cast<ObjectContext *>(_obc);
obc->unconnected_watchers.erase(entity);
obc->obs.oi.watchers.erase(entity);
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = osd->osdmap->get_epoch();
ctx->at_version.version = log.head.version + 1;
entity_inst_t nobody;
/* Currently, mode.try_write always returns true. If this changes, we will
* need to delay the repop accordingly */
assert(mode.try_write(nobody));
RepGather *repop = new_repop(ctx, obc, rep_tid);
ObjectStore::Transaction *t = &ctx->op_t;
ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, obc->obs.oi.soid,
ctx->at_version,
obc->obs.oi.version,
osd_reqid_t(), ctx->mtime));
eversion_t old_last_update = log.head;
bool old_exists = repop->obc->obs.exists;
uint64_t old_size = repop->obc->obs.oi.size;
eversion_t old_version = repop->obc->obs.oi.version;
obc->obs.oi.prior_version = old_version;
obc->obs.oi.version = ctx->at_version;
bufferlist bl;
::encode(obc->obs.oi, bl);
t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl);
ctx->at_version.version++;
append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
// obc ref swallowed by repop!
issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists,
old_size, old_version);
eval_repop(repop);
}
ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
const object_locator_t& oloc,
@ -3954,6 +4048,11 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
osd->take_waiters(waiting_for_degraded_object[soid]);
waiting_for_degraded_object.erase(soid);
}
map<hobject_t, ObjectContext *>::iterator i =
object_contexts.find(soid);
if (i != object_contexts.end()) {
populate_obc_watchers(i->second);
}
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
@ -4291,8 +4390,6 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
obc->obs.exists = true;
obc->obs.oi.decode(oibl);
populate_obc_watchers(obc);
// suck in snapset context?
SnapSetContext *ssc = obc->ssc;
@ -4446,6 +4543,15 @@ void ReplicatedPG::on_shutdown()
remove_watchers_and_notifies();
}
void ReplicatedPG::on_activate()
{
for (map<hobject_t, ObjectContext *>::iterator i = object_contexts.begin();
i != object_contexts.end();
++i) {
populate_obc_watchers(i->second);
}
}
void ReplicatedPG::on_change()
{
dout(10) << "on_change" << dendl;
@ -4462,6 +4568,8 @@ void ReplicatedPG::on_change()
state_clear(PG_STATE_REPAIR);
}
context_registry_on_change();
// take object waiters
take_object_waiters(waiting_for_missing_object);
take_object_waiters(waiting_for_degraded_object);
@ -4485,8 +4593,6 @@ void ReplicatedPG::on_role_change()
p++)
osd->take_waiters(p->second);
waiting_for_ondisk.clear();
context_registry_on_change();
}

View File

@ -273,7 +273,7 @@ public:
// any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
map<entity_name_t, OSD::Session *> watchers;
map<entity_name_t, utime_t> unconnected_watchers;
map<entity_name_t, Context *> unconnected_watchers;
map<Watch::Notification *, bool> notifs;
ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_)
@ -487,6 +487,14 @@ protected:
map<object_t, SnapSetContext*> snapset_contexts;
void populate_obc_watchers(ObjectContext *obc);
void register_unconnected_watcher(void *obc,
entity_name_t entity,
utime_t expire);
void unregister_unconnected_watcher(void *obc,
entity_name_t entity);
void handle_watch_timeout(void *obc,
entity_name_t entity,
utime_t expire);
ObjectContext *lookup_object_context(const hobject_t& soid) {
if (object_contexts.count(soid)) {
@ -801,6 +809,7 @@ public:
void on_acker_change();
void on_role_change();
void on_change();
void on_activate();
void on_shutdown();
};

View File

@ -28,3 +28,9 @@ void Watch::C_NotifyTimeout::finish(int r)
osd->handle_notify_timeout(notif);
}
void Watch::C_WatchTimeout::finish(int r)
{
osd->handle_watch_timeout(obc, static_cast<ReplicatedPG *>(pg), entity,
expire);
}

View File

@ -60,6 +60,19 @@ public:
void finish(int r);
};
class C_WatchTimeout : public Context {
OSD *osd;
void *obc;
void *pg;
entity_name_t entity;
utime_t expire;
public:
C_WatchTimeout(OSD *_osd, void *_obc, void *_pg,
entity_name_t _entity, utime_t _expire) :
osd(_osd), obc(_obc), pg(_pg), entity(_entity), expire(_expire) {}
void finish(int r);
};
private:
std::map<uint64_t, Notification *> notifs; /* notif_id to notifications */

View File

@ -0,0 +1,12 @@
# This detects the incorrect mapping (due to off by one error in
# linear search) that caused #1594
$ crushtool -i "$TESTDIR/five-devices.crushmap" --test --x 3 --rule 2 --force 3 -v --weight 1 0 --weight 2 0 --weight 4 0
devices weights (hex): [10000,0,0,10000,0]
rule 2 (rbd), x = 3..3
rule 2 x 3 [3,0]
device 0:\t1 (esc)
device 1:\t0 (esc)
device 2:\t0 (esc)
device 3:\t1 (esc)
device 4:\t0 (esc)
result size 2x:\t1 (esc)

Binary file not shown.

After

(image error) Size: 368 B