diff --git a/src/common/Mutex.cc b/src/common/Mutex.cc
index 064c3b30efc..cedba098128 100644
--- a/src/common/Mutex.cc
+++ b/src/common/Mutex.cc
@@ -43,7 +43,7 @@ Mutex::Mutex(const std::string &n, bool r, bool ld,
     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
     pthread_mutex_init(&_m,&attr);
     pthread_mutexattr_destroy(&attr);
-    if (g_lockdep)
+    if (lockdep && g_lockdep)
       _register();
   }
   else if (lockdep) {
@@ -76,7 +76,7 @@ Mutex::~Mutex() {
     cct->get_perfcounters_collection()->remove(logger);
     delete logger;
   }
-  if (g_lockdep) {
+  if (lockdep && g_lockdep) {
     lockdep_unregister(id);
   }
 }
diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc
index a8a7c2114d8..e0e73c3bff0 100644
--- a/src/librbd/ImageWatcher.cc
+++ b/src/librbd/ImageWatcher.cc
@@ -75,7 +75,7 @@ bool ImageWatcher::is_lock_owner() const {
 }
 
 int ImageWatcher::register_watch() {
-  ldout(m_image_ctx.cct, 10) << "registering image watcher" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl;
 
   RWLock::WLocker l(m_watch_lock);
   assert(m_watch_state == WATCH_STATE_UNREGISTERED);
@@ -91,7 +91,7 @@ int ImageWatcher::register_watch() {
 }
 
 int ImageWatcher::unregister_watch() {
-  ldout(m_image_ctx.cct, 10)  << "unregistering image watcher" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " unregistering image watcher" << dendl;
 
   {
     Mutex::Locker l(m_aio_request_lock);
@@ -152,25 +152,26 @@ int ImageWatcher::try_lock() {
                    iter->addr, sizeof(iter->addr)) == 0) &&
 	  (locker_handle == iter->cookie)) {
 	Mutex::Locker l(m_owner_client_id_lock);
-	m_owner_client_id = ClientId(iter->watcher_id, locker_handle);
+        set_owner_client_id(ClientId(iter->watcher_id, locker_handle));
 	return 0;
       }
     }
 
     if (m_image_ctx.blacklist_on_break_lock) {
-      ldout(m_image_ctx.cct, 1) << "blacklisting client: " << locker << "@"
-				<< locker_address << dendl;
+      ldout(m_image_ctx.cct, 1) << this << " blacklisting client: " << locker
+                                << "@" << locker_address << dendl;
       librados::Rados rados(m_image_ctx.md_ctx);
       r = rados.blacklist_add(locker_address,
 			      m_image_ctx.blacklist_expire_seconds);
       if (r < 0) {
-        lderr(m_image_ctx.cct) << "unable to blacklist client: "
+        lderr(m_image_ctx.cct) << this << " unable to blacklist client: "
 			       << cpp_strerror(r) << dendl;
         return r;
       }
     }
 
-    ldout(m_image_ctx.cct, 5) << "breaking exclusive lock: " << locker << dendl;
+    ldout(m_image_ctx.cct, 5) << this << " breaking exclusive lock: " << locker
+                              << dendl;
     r = rados::cls::lock::break_lock(&m_image_ctx.md_ctx,
                                      m_image_ctx.header_oid, RBD_LOCK_NAME,
                                      locker_cookie, locker);
@@ -189,7 +190,7 @@ void ImageWatcher::request_lock(
   {
     Mutex::Locker l(m_aio_request_lock);
     bool request_pending = !m_aio_requests.empty();
-    ldout(m_image_ctx.cct, 15) << "queuing aio request: " << c
+    ldout(m_image_ctx.cct, 15) << this << " queuing aio request: " << c
 			       << dendl;
 
     c->get();
@@ -201,7 +202,7 @@ void ImageWatcher::request_lock(
 
   RWLock::RLocker l(m_watch_lock);
   if (m_watch_state == WATCH_STATE_REGISTERED) {
-    ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl;
+    ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
 
     // run notify request in finisher to avoid blocking aio path
     FunctionContext *ctx = new FunctionContext(
@@ -227,17 +228,18 @@ bool ImageWatcher::try_request_lock() {
   m_image_ctx.owner_lock.get_read();
 
   if (r < 0) {
-    ldout(m_image_ctx.cct, 5) << "failed to acquire exclusive lock:"
+    ldout(m_image_ctx.cct, 5) << this << " failed to acquire exclusive lock:"
 			      << cpp_strerror(r) << dendl;
     return false;
   }
 
   if (is_lock_owner()) {
-    ldout(m_image_ctx.cct, 15) << "successfully acquired exclusive lock"
+    ldout(m_image_ctx.cct, 15) << this << " successfully acquired exclusive lock"
 			       << dendl;
   } else {
-    ldout(m_image_ctx.cct, 15) << "unable to acquire exclusive lock, retrying"
-			       << dendl;
+    ldout(m_image_ctx.cct, 15) << this
+                               << " unable to acquire exclusive lock, retrying"
+                               << dendl;
   }
   return is_lock_owner();
 }
@@ -257,34 +259,34 @@ int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie
   }
 
   if (lockers.empty()) {
-    ldout(m_image_ctx.cct, 20) << "no lockers detected" << dendl;
+    ldout(m_image_ctx.cct, 20) << this << " no lockers detected" << dendl;
     return 0;
   }
 
   if (lock_tag != WATCHER_LOCK_TAG) {
-    ldout(m_image_ctx.cct, 5) << "locked by external mechanism: tag="
+    ldout(m_image_ctx.cct, 5) << this << " locked by external mechanism: tag="
 			      << lock_tag << dendl;
     return -EBUSY;
   }
 
   if (lock_type == LOCK_SHARED) {
-    ldout(m_image_ctx.cct, 5) << "shared lock type detected" << dendl;
+    ldout(m_image_ctx.cct, 5) << this << " shared lock type detected" << dendl;
     return -EBUSY;
   }
 
   std::map<rados::cls::lock::locker_id_t,
            rados::cls::lock::locker_info_t>::iterator iter = lockers.begin();
   if (!decode_lock_cookie(iter->first.cookie, handle)) {
-    ldout(m_image_ctx.cct, 5) << "locked by external mechanism: cookie="
-			      << iter->first.cookie << dendl;
+    ldout(m_image_ctx.cct, 5) << this << " locked by external mechanism: "
+                              << "cookie=" << iter->first.cookie << dendl;
     return -EBUSY;
   }
 
   *locker = iter->first.locker;
   *cookie = iter->first.cookie;
   *address = stringify(iter->second.addr);
-  ldout(m_image_ctx.cct, 10) << "retrieved exclusive locker: " << *locker
-			     << "@" << *address << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " retrieved exclusive locker: "
+                             << *locker << "@" << *address << dendl;
   return 0;
 }
 
@@ -297,13 +299,13 @@ int ImageWatcher::lock() {
     return r;
   }
 
-  ldout(m_image_ctx.cct, 10) << "acquired exclusive lock" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " acquired exclusive lock" << dendl;
   m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
 
   ClientId owner_client_id = get_client_id();
   {
     Mutex::Locker l(m_owner_client_id_lock);
-    m_owner_client_id = owner_client_id;
+    set_owner_client_id(owner_client_id);
   }
 
   if (m_image_ctx.object_map.enabled()) {
@@ -348,12 +350,12 @@ int ImageWatcher::unlock()
     return 0;
   }
 
-  ldout(m_image_ctx.cct, 10) << "releasing exclusive lock" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock" << dendl;
   m_lock_owner_state = LOCK_OWNER_STATE_NOT_LOCKED;
   int r = rados::cls::lock::unlock(&m_image_ctx.md_ctx, m_image_ctx.header_oid,
 				   RBD_LOCK_NAME, encode_lock_cookie());
   if (r < 0 && r != -ENOENT) {
-    lderr(m_image_ctx.cct) << "failed to release exclusive lock: "
+    lderr(m_image_ctx.cct) << this << " failed to release exclusive lock: "
 			   << cpp_strerror(r) << dendl;
     return r;
   }
@@ -362,6 +364,9 @@ int ImageWatcher::unlock()
     m_image_ctx.object_map.unlock();
   }
 
+  Mutex::Locker l(m_owner_client_id_lock);
+  set_owner_client_id(ClientId());
+
   FunctionContext *ctx = new FunctionContext(
     boost::bind(&ImageWatcher::notify_released_lock, this));
   m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx);
@@ -371,26 +376,27 @@ int ImageWatcher::unlock()
 bool ImageWatcher::release_lock()
 {
   assert(m_image_ctx.owner_lock.is_wlocked());
-  ldout(m_image_ctx.cct, 10) << "releasing exclusive lock by request" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock by request"
+                             << dendl;
   if (!is_lock_owner()) {
     return false;
   }
   prepare_unlock();
-
   m_image_ctx.owner_lock.put_write();
   m_image_ctx.cancel_async_requests();
   m_image_ctx.flush_async_operations();
-  m_image_ctx.owner_lock.get_write();
-
-  if (!is_lock_owner()) {
-    return false;
-  }
 
   {
-    RWLock::WLocker l2(m_image_ctx.md_lock);
+    RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+    RWLock::WLocker md_locker(m_image_ctx.md_lock);
     librbd::_flush(&m_image_ctx);
   }
 
+  m_image_ctx.owner_lock.get_write();
+  if (!is_lock_owner()) {
+    return false;
+  }
+
   unlock();
   return true;
 }
@@ -410,7 +416,7 @@ void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
 
 int ImageWatcher::notify_async_progress(const AsyncRequestId &request,
 					uint64_t offset, uint64_t total) {
-  ldout(m_image_ctx.cct, 20) << "remote async request progress: "
+  ldout(m_image_ctx.cct, 20) << this << " remote async request progress: "
 			     << request << " @ " << offset
 			     << "/" << total << dendl;
 
@@ -430,7 +436,7 @@ void ImageWatcher::schedule_async_complete(const AsyncRequestId &request,
 
 int ImageWatcher::notify_async_complete(const AsyncRequestId &request,
 					int r) {
-  ldout(m_image_ctx.cct, 20) << "remote async request finished: "
+  ldout(m_image_ctx.cct, 20) << this << " remote async request finished: "
 			     << request << " = " << r << dendl;
 
   bufferlist bl;
@@ -443,7 +449,7 @@ int ImageWatcher::notify_async_complete(const AsyncRequestId &request,
   int ret = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl,
 				       NOTIFY_TIMEOUT, NULL);
   if (ret < 0) {
-    lderr(m_image_ctx.cct) << "failed to notify async complete: "
+    lderr(m_image_ctx.cct) << this << " failed to notify async complete: "
 			   << cpp_strerror(ret) << dendl;
     if (ret == -ETIMEDOUT) {
       schedule_async_complete(request, r);
@@ -541,6 +547,7 @@ bool ImageWatcher::decode_lock_cookie(const std::string &tag,
 }
 
 void ImageWatcher::schedule_retry_aio_requests(bool use_timer) {
+  m_task_finisher->cancel(TASK_CODE_REQUEST_LOCK);
   Context *ctx = new FunctionContext(boost::bind(
     &ImageWatcher::retry_aio_requests, this));
   if (use_timer) {
@@ -559,11 +566,12 @@ void ImageWatcher::retry_aio_requests() {
     lock_request_restarts.swap(m_aio_requests);
   }
 
-  ldout(m_image_ctx.cct, 15) << "retrying pending aio requests" << dendl;
+  ldout(m_image_ctx.cct, 15) << this << " retrying pending aio requests"
+                             << dendl;
   for (std::vector<AioRequest>::iterator iter = lock_request_restarts.begin();
        iter != lock_request_restarts.end(); ++iter) {
-    ldout(m_image_ctx.cct, 20) << "retrying aio request: " << iter->second
-			       << dendl;
+    ldout(m_image_ctx.cct, 20) << this << " retrying aio request: "
+                               << iter->second << dendl;
     iter->first(iter->second);
     iter->second->put();
   }
@@ -585,6 +593,13 @@ void ImageWatcher::cancel_async_requests() {
   m_async_requests.clear();
 }
 
+void ImageWatcher::set_owner_client_id(const WatchNotify::ClientId& client_id) {
+  assert(m_owner_client_id_lock.is_locked());
+  m_owner_client_id = client_id;
+  ldout(m_image_ctx.cct, 10) << this << " current lock owner: "
+                             << m_owner_client_id << dendl;
+}
+
 ClientId ImageWatcher::get_client_id() {
   RWLock::RLocker l(m_watch_lock);
   return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
@@ -596,14 +611,14 @@ void ImageWatcher::notify_release_lock() {
 }
 
 void ImageWatcher::notify_released_lock() {
-  ldout(m_image_ctx.cct, 10) << "notify released lock" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " notify released lock" << dendl;
   bufferlist bl;
   ::encode(NotifyMessage(ReleasedLockPayload(get_client_id())), bl);
   m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
 }
 
 void ImageWatcher::notify_request_lock() {
-  ldout(m_image_ctx.cct, 10) << "notify request lock" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;
   m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
 
   m_image_ctx.owner_lock.get_read();
@@ -620,12 +635,20 @@ void ImageWatcher::notify_request_lock() {
   m_image_ctx.owner_lock.put_read();
 
   if (r == -ETIMEDOUT) {
-    ldout(m_image_ctx.cct, 5) << "timed out requesting lock: retrying" << dendl;
+    ldout(m_image_ctx.cct, 5) << this << "timed out requesting lock: retrying"
+                              << dendl;
     retry_aio_requests();
   } else if (r < 0) {
-    lderr(m_image_ctx.cct) << "error requesting lock: " << cpp_strerror(r)
-			   << dendl;
+    lderr(m_image_ctx.cct) << this << " error requesting lock: "
+                           << cpp_strerror(r) << dendl;
     schedule_retry_aio_requests(true);
+  } else {
+    // lock owner acked -- but resend if we don't see them release the lock
+    int retry_timeout = m_image_ctx.cct->_conf->client_notify_timeout;
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&ImageWatcher::notify_request_lock, this));
+    m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK,
+                                     retry_timeout, ctx);
   }
 }
 
@@ -640,7 +663,7 @@ int ImageWatcher::notify_lock_owner(bufferlist &bl) {
 				     &response_bl);
   m_image_ctx.owner_lock.get_read();
   if (r < 0 && r != -ETIMEDOUT) {
-    lderr(m_image_ctx.cct) << "lock owner notification failed: "
+    lderr(m_image_ctx.cct) << this << " lock owner notification failed: "
 			   << cpp_strerror(r) << dendl;
     return r;
   }
@@ -652,7 +675,7 @@ int ImageWatcher::notify_lock_owner(bufferlist &bl) {
       bufferlist::iterator iter = response_bl.begin();
       ::decode(responses, iter);
     } catch (const buffer::error &err) {
-      lderr(m_image_ctx.cct) << "failed to decode response" << dendl;
+      lderr(m_image_ctx.cct) << this << " failed to decode response" << dendl;
       return -EINVAL;
     }
   }
@@ -662,7 +685,8 @@ int ImageWatcher::notify_lock_owner(bufferlist &bl) {
   for (responses_t::iterator i = responses.begin(); i != responses.end(); ++i) {
     if (i->second.length() > 0) {
       if (lock_owner_responded) {
-	lderr(m_image_ctx.cct) << "duplicate lock owners detected" << dendl;
+	lderr(m_image_ctx.cct) << this << " duplicate lock owners detected"
+                               << dendl;
 	return -EIO;
       }
       lock_owner_responded = true;
@@ -671,7 +695,7 @@ int ImageWatcher::notify_lock_owner(bufferlist &bl) {
   }
 
   if (!lock_owner_responded) {
-    lderr(m_image_ctx.cct) << "no lock owners detected" << dendl;
+    lderr(m_image_ctx.cct) << this << " no lock owners detected" << dendl;
     return -ETIMEDOUT;
   }
 
@@ -703,7 +727,7 @@ void ImageWatcher::async_request_timed_out(const AsyncRequestId &id) {
   std::map<AsyncRequestId, AsyncRequest>::iterator it =
     m_async_requests.find(id);
   if (it != m_async_requests.end()) {
-    ldout(m_image_ctx.cct, 10) << "request timed-out: " << id << dendl;
+    ldout(m_image_ctx.cct, 10) << this << " request timed-out: " << id << dendl;
     it->second.first->complete(-ERESTART);
   }
 }
@@ -713,7 +737,8 @@ int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id,
 				       ProgressContext& prog_ctx) {
   assert(m_image_ctx.owner_lock.is_locked());
 
-  ldout(m_image_ctx.cct, 10) << "async request: " << async_request_id << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " async request: " << async_request_id
+                             << dendl;
 
   C_SaferCond ctx;
 
@@ -767,7 +792,7 @@ void ImageWatcher::cleanup_async_request(const AsyncRequestId& async_request_id,
 
 void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
 				  bufferlist *out) {
-  ldout(m_image_ctx.cct, 10) << "image header updated" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " image header updated" << dendl;
 
   Mutex::Locker lictx(m_image_ctx.refresh_lock);
   ++m_image_ctx.refresh_seq;
@@ -776,14 +801,15 @@ void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
 
 void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
                                   bufferlist *out) {
-  ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " image exclusively locked announcement"
+                             << dendl;
   if (payload.client_id.is_valid()) {
     Mutex::Locker l(m_owner_client_id_lock);
     if (payload.client_id == m_owner_client_id) {
       // we already know that the remote client is the owner
       return;
     }
-    m_owner_client_id = payload.client_id;
+    set_owner_client_id(payload.client_id);
   }
 
   RWLock::RLocker l(m_image_ctx.owner_lock);
@@ -795,13 +821,16 @@ void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
 
 void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
                                   bufferlist *out) {
-  ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " exclusive lock released" << dendl;
   if (payload.client_id.is_valid()) {
     Mutex::Locker l(m_owner_client_id_lock);
     if (payload.client_id != m_owner_client_id) {
+      ldout(m_image_ctx.cct, 10) << this << " unexpected owner: "
+                                 << payload.client_id << " != "
+                                 << m_owner_client_id << dendl;
       return;
     }
-    m_owner_client_id = ClientId();
+    set_owner_client_id(ClientId());
   }
 
   RWLock::RLocker l(m_image_ctx.owner_lock);
@@ -813,7 +842,7 @@ void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
 
 void ImageWatcher::handle_payload(const RequestLockPayload &payload,
                                   bufferlist *out) {
-  ldout(m_image_ctx.cct, 10) << "exclusive lock requested" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " exclusive lock requested" << dendl;
   if (payload.client_id == get_client_id()) {
     return;
   }
@@ -828,10 +857,10 @@ void ImageWatcher::handle_payload(const RequestLockPayload &payload,
       if (!m_owner_client_id.is_valid()) {
 	return;
       }
-      m_owner_client_id = ClientId();
     }
 
-    ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl;
+    ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock"
+                               << dendl;
     FunctionContext *ctx = new FunctionContext(
       boost::bind(&ImageWatcher::notify_release_lock, this));
     m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
@@ -844,7 +873,7 @@ void ImageWatcher::handle_payload(const AsyncProgressPayload &payload,
   std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
     m_async_requests.find(payload.async_request_id);
   if (req_it != m_async_requests.end()) {
-    ldout(m_image_ctx.cct, 20) << "request progress: "
+    ldout(m_image_ctx.cct, 20) << this << " request progress: "
 			       << payload.async_request_id << " @ "
 			       << payload.offset << "/" << payload.total
 			       << dendl;
@@ -859,7 +888,7 @@ void ImageWatcher::handle_payload(const AsyncCompletePayload &payload,
   std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
     m_async_requests.find(payload.async_request_id);
   if (req_it != m_async_requests.end()) {
-    ldout(m_image_ctx.cct, 10) << "request finished: "
+    ldout(m_image_ctx.cct, 10) << this << " request finished: "
                                << payload.async_request_id << "="
 			       << payload.result << dendl;
     req_it->second.first->complete(payload.result);
@@ -877,11 +906,11 @@ void ImageWatcher::handle_payload(const FlattenPayload &payload,
     int r = prepare_async_request(payload.async_request_id, &new_request,
                                   &ctx, &prog_ctx);
     if (new_request) {
-      ldout(m_image_ctx.cct, 10) << "remote flatten request: "
+      ldout(m_image_ctx.cct, 10) << this << " remote flatten request: "
 				 << payload.async_request_id << dendl;
       r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
       if (r < 0) {
-	lderr(m_image_ctx.cct) << "remove flatten request failed: "
+	lderr(m_image_ctx.cct) << this << " remove flatten request failed: "
 			       << cpp_strerror(r) << dendl;
         cleanup_async_request(payload.async_request_id, ctx);
       }
@@ -901,12 +930,12 @@ void ImageWatcher::handle_payload(const ResizePayload &payload,
     int r = prepare_async_request(payload.async_request_id, &new_request,
                                   &ctx, &prog_ctx);
     if (new_request) {
-      ldout(m_image_ctx.cct, 10) << "remote resize request: "
+      ldout(m_image_ctx.cct, 10) << this << " remote resize request: "
 				 << payload.async_request_id << " "
 				 << payload.size << dendl;
       r = librbd::async_resize(&m_image_ctx, ctx, payload.size, *prog_ctx);
       if (r < 0) {
-	lderr(m_image_ctx.cct) << "remove resize request failed: "
+	lderr(m_image_ctx.cct) << this << " remove resize request failed: "
 			       << cpp_strerror(r) << dendl;
         cleanup_async_request(payload.async_request_id, ctx);
       }
@@ -920,7 +949,7 @@ void ImageWatcher::handle_payload(const SnapCreatePayload &payload,
 				  bufferlist *out) {
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
-    ldout(m_image_ctx.cct, 10) << "remote snap_create request: "
+    ldout(m_image_ctx.cct, 10) << this << " remote snap_create request: "
 			       << payload.snap_name << dendl;
     int r = librbd::snap_create_helper(&m_image_ctx, NULL,
                                        payload.snap_name.c_str());
@@ -933,7 +962,7 @@ void ImageWatcher::handle_payload(const SnapRemovePayload &payload,
 				  bufferlist *out) {
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
-    ldout(m_image_ctx.cct, 10) << "remote snap_remove request: "
+    ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
 			       << payload.snap_name << dendl;
     int r = librbd::snap_remove_helper(&m_image_ctx, NULL,
                                        payload.snap_name.c_str());
@@ -952,11 +981,13 @@ void ImageWatcher::handle_payload(const RebuildObjectMapPayload& payload,
     int r = prepare_async_request(payload.async_request_id, &new_request,
                                   &ctx, &prog_ctx);
     if (new_request) {
-      ldout(m_image_ctx.cct, 10) << "remote rebuild object map request: "
+      ldout(m_image_ctx.cct, 10) << this
+                                 << " remote rebuild object map request: "
                                  << payload.async_request_id << dendl;
       r = librbd::async_rebuild_object_map(&m_image_ctx, ctx, *prog_ctx);
       if (r < 0) {
-        lderr(m_image_ctx.cct) << "remove rebuild object map request failed: "
+        lderr(m_image_ctx.cct) << this
+                               << " remove rebuild object map request failed: "
                                << cpp_strerror(r) << dendl;
         cleanup_async_request(payload.async_request_id, ctx);
       }
@@ -985,23 +1016,23 @@ void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
       bufferlist::iterator iter = bl.begin();
       ::decode(notify_message, iter);
     } catch (const buffer::error &err) {
-      lderr(m_image_ctx.cct) << "error decoding image notification: "
+      lderr(m_image_ctx.cct) << this << " error decoding image notification: "
 			     << err.what() << dendl;
       return;
     }
   }
 
   apply_visitor(HandlePayloadVisitor(this, notify_id, handle),
-		notify_message.payload); 
+		notify_message.payload);
 }
 
 void ImageWatcher::handle_error(uint64_t handle, int err) {
-  lderr(m_image_ctx.cct) << "image watch failed: " << handle << ", "
+  lderr(m_image_ctx.cct) << this << " image watch failed: " << handle << ", "
                          << cpp_strerror(err) << dendl;
 
   {
     Mutex::Locker l(m_owner_client_id_lock);
-    m_owner_client_id = ClientId();
+    set_owner_client_id(ClientId());
   }
 
   RWLock::WLocker l(m_watch_lock);
@@ -1021,7 +1052,7 @@ void ImageWatcher::acknowledge_notify(uint64_t notify_id, uint64_t handle,
 }
 
 void ImageWatcher::reregister_watch() {
-  ldout(m_image_ctx.cct, 10) << "re-registering image watch" << dendl;
+  ldout(m_image_ctx.cct, 10) << this << " re-registering image watch" << dendl;
 
   {
     RWLock::WLocker l(m_image_ctx.owner_lock);
@@ -1041,7 +1072,7 @@ void ImageWatcher::reregister_watch() {
       r = m_image_ctx.md_ctx.watch2(m_image_ctx.header_oid,
                                     &m_watch_handle, &m_watch_ctx);
       if (r < 0) {
-        lderr(m_image_ctx.cct) << "failed to re-register image watch: "
+        lderr(m_image_ctx.cct) << this << " failed to re-register image watch: "
                                << cpp_strerror(r) << dendl;
 	if (r != -ESHUTDOWN) {
 	  FunctionContext *ctx = new FunctionContext(boost::bind(
@@ -1059,10 +1090,11 @@ void ImageWatcher::reregister_watch() {
     if (was_lock_owner) {
       r = try_lock();
       if (r == -EBUSY) {
-        ldout(m_image_ctx.cct, 5) << "lost image lock while re-registering "
-                                  << "image watch" << dendl;
+        ldout(m_image_ctx.cct, 5) << this << "lost image lock while "
+                                  << "re-registering image watch" << dendl;
       } else if (r < 0) {
-        lderr(m_image_ctx.cct) << "failed to lock image while re-registering "
+        lderr(m_image_ctx.cct) << this
+                               << "failed to lock image while re-registering "
                                << "image watch" << cpp_strerror(r) << dendl;
       }
     }
diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h
index 73538148f29..6ebeb9dfcb8 100644
--- a/src/librbd/ImageWatcher.h
+++ b/src/librbd/ImageWatcher.h
@@ -222,6 +222,7 @@ namespace librbd {
     void schedule_cancel_async_requests();
     void cancel_async_requests();
 
+    void set_owner_client_id(const WatchNotify::ClientId &client_id);
     WatchNotify::ClientId get_client_id();
 
     void notify_release_lock();
diff --git a/src/librbd/WatchNotifyTypes.cc b/src/librbd/WatchNotifyTypes.cc
index 7532816ca38..6785864d842 100644
--- a/src/librbd/WatchNotifyTypes.cc
+++ b/src/librbd/WatchNotifyTypes.cc
@@ -423,6 +423,12 @@ std::ostream &operator<<(std::ostream &out,
   return out;
 }
 
+std::ostream &operator<<(std::ostream &out,
+                         const librbd::WatchNotify::ClientId &client_id) {
+  out << "[" << client_id.gid << "," << client_id.handle << "]";
+  return out;
+}
+
 std::ostream &operator<<(std::ostream &out,
                          const librbd::WatchNotify::AsyncRequestId &request) {
   out << "[" << request.client_id.gid << "," << request.client_id.handle << ","
diff --git a/src/librbd/WatchNotifyTypes.h b/src/librbd/WatchNotifyTypes.h
index 1ded0bacd37..94a2836736a 100644
--- a/src/librbd/WatchNotifyTypes.h
+++ b/src/librbd/WatchNotifyTypes.h
@@ -259,6 +259,8 @@ struct ResponseMessage {
 
 std::ostream &operator<<(std::ostream &out,
                          const librbd::WatchNotify::NotifyOp &op);
+std::ostream &operator<<(std::ostream &out,
+                         const librbd::WatchNotify::ClientId &client);
 std::ostream &operator<<(std::ostream &out,
                          const librbd::WatchNotify::AsyncRequestId &request);
 
diff --git a/src/test/librados_test_stub/TestIoCtxImpl.cc b/src/test/librados_test_stub/TestIoCtxImpl.cc
index 39cd8b4a85d..1611a60fa21 100644
--- a/src/test/librados_test_stub/TestIoCtxImpl.cc
+++ b/src/test/librados_test_stub/TestIoCtxImpl.cc
@@ -247,7 +247,8 @@ int TestIoCtxImpl::unwatch(uint64_t handle) {
 
 int TestIoCtxImpl::watch(const std::string& o, uint64_t *handle,
                          librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
-  return m_client->get_watch_notify().watch(o, handle, ctx, ctx2);
+  return m_client->get_watch_notify().watch(o, get_instance_id(), handle, ctx,
+                                            ctx2);
 }
 
 int TestIoCtxImpl::execute_aio_operations(const std::string& oid,
diff --git a/src/test/librados_test_stub/TestWatchNotify.cc b/src/test/librados_test_stub/TestWatchNotify.cc
index 094bcb5b405..14a43bc58cb 100644
--- a/src/test/librados_test_stub/TestWatchNotify.cc
+++ b/src/test/librados_test_stub/TestWatchNotify.cc
@@ -50,7 +50,7 @@ int TestWatchNotify::list_watchers(const std::string& o,
        it != watcher->watch_handles.end(); ++it) {
     obj_watch_t obj;
     strcpy(obj.addr, ":/0");
-    obj.watcher_id = static_cast<int64_t>(it->second.handle);
+    obj.watcher_id = static_cast<int64_t>(it->second.instance_id);
     obj.cookie = it->second.handle;
     obj.timeout_seconds = 30;
     out_watchers->push_back(obj);
@@ -113,12 +113,14 @@ void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id,
   notify_handle->cond.Signal();
 }
 
-int TestWatchNotify::watch(const std::string& o, uint64_t *handle,
-                           librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
+int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
+                           uint64_t *handle, librados::WatchCtx *ctx,
+                           librados::WatchCtx2 *ctx2) {
   SharedWatcher watcher = get_watcher(o);
 
   RWLock::WLocker l(watcher->lock);
   WatchHandle watch_handle;
+  watch_handle.instance_id = instance_id;
   watch_handle.handle = ++m_handle;
   watch_handle.watch_ctx = ctx;
   watch_handle.watch_ctx2 = ctx2;
diff --git a/src/test/librados_test_stub/TestWatchNotify.h b/src/test/librados_test_stub/TestWatchNotify.h
index 4912ef27444..1761302bbf3 100644
--- a/src/test/librados_test_stub/TestWatchNotify.h
+++ b/src/test/librados_test_stub/TestWatchNotify.h
@@ -35,6 +35,7 @@ public:
   typedef std::map<uint64_t, SharedNotifyHandle> NotifyHandles;
 
   struct WatchHandle {
+    uint64_t instance_id;
     uint64_t handle;
     librados::WatchCtx* watch_ctx;
     librados::WatchCtx2* watch_ctx2;
@@ -60,7 +61,7 @@ public:
              uint64_t timeout_ms, bufferlist *pbl);
   void notify_ack(const std::string& o, uint64_t notify_id,
                   uint64_t handle, uint64_t gid, bufferlist& bl);
-  int watch(const std::string& o, uint64_t *handle,
+  int watch(const std::string& o, uint64_t instance_id, uint64_t *handle,
             librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
   int unwatch(uint64_t handle);
 
diff --git a/src/test/librbd/test_ImageWatcher.cc b/src/test/librbd/test_ImageWatcher.cc
index da84093f612..3046a3d07ec 100644
--- a/src/test/librbd/test_ImageWatcher.cc
+++ b/src/test/librbd/test_ImageWatcher.cc
@@ -192,7 +192,8 @@ public:
     Mutex::Locker l(m_callback_lock);
     int r = 0;
     while (!m_aio_completions.empty() &&
-	   m_aio_completion_restarts < m_expected_aio_restarts) {
+           (m_expected_aio_restarts == 0 ||
+	    m_aio_completion_restarts < m_expected_aio_restarts)) {
       r = m_callback_cond.WaitInterval(ictx.cct, m_callback_lock,
 				       utime_t(10, 0));
       if (r != 0) {
@@ -601,6 +602,7 @@ TEST_F(TestImageWatcher, RequestLockTimedOut) {
   m_notify_acks = boost::assign::list_of(
     std::make_pair(NOTIFY_OP_REQUEST_LOCK, bufferlist()));
 
+  m_expected_aio_restarts = 1;
   {
     RWLock::WLocker l(ictx->owner_lock);
     ictx->image_watcher->request_lock(
@@ -616,6 +618,45 @@ TEST_F(TestImageWatcher, RequestLockTimedOut) {
   ASSERT_TRUE(wait_for_aio_completions(*ictx));
 }
 
+TEST_F(TestImageWatcher, RequestLockIgnored) {
+  REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+  ASSERT_EQ(0, register_image_watch(*ictx));
+  ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+			  "auto " + stringify(m_watch_ctx->get_handle())));
+
+  m_notify_acks = boost::assign::list_of(
+    std::make_pair(NOTIFY_OP_REQUEST_LOCK, create_response_message(0)));
+
+  int orig_notify_timeout = ictx->cct->_conf->client_notify_timeout;
+  ictx->cct->_conf->set_val("client_notify_timeout", "0");
+  BOOST_SCOPE_EXIT( (ictx)(orig_notify_timeout) ) {
+    ictx->cct->_conf->set_val("client_notify_timeout",
+                              stringify(orig_notify_timeout));
+  } BOOST_SCOPE_EXIT_END;
+
+  {
+    RWLock::WLocker l(ictx->owner_lock);
+    ictx->image_watcher->request_lock(
+      boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
+      create_aio_completion(*ictx));
+  }
+
+  ASSERT_TRUE(wait_for_notifies(*ictx));
+  NotifyOps expected_notify_ops;
+  expected_notify_ops += NOTIFY_OP_REQUEST_LOCK;
+  ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  // after the request times out -- it will be resent
+  ASSERT_TRUE(wait_for_notifies(*ictx));
+  ASSERT_EQ(expected_notify_ops, m_notifies);
+
+  ASSERT_EQ(0, unlock_image());
+  ASSERT_TRUE(wait_for_aio_completions(*ictx));
+}
+
 TEST_F(TestImageWatcher, RequestLockTryLockRace) {
   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
 
@@ -628,6 +669,7 @@ TEST_F(TestImageWatcher, RequestLockTryLockRace) {
   m_notify_acks = boost::assign::list_of(
     std::make_pair(NOTIFY_OP_REQUEST_LOCK, create_response_message(0)));
 
+  m_expected_aio_restarts = 1;
   {
     RWLock::WLocker l(ictx->owner_lock);
     ictx->image_watcher->request_lock(
@@ -663,6 +705,7 @@ TEST_F(TestImageWatcher, RequestLockPreTryLockFailed) {
   ASSERT_EQ(0, open_image(m_image_name, &ictx));
   ASSERT_EQ(0, lock_image(*ictx, LOCK_SHARED, "manually 1234"));
 
+  m_expected_aio_restarts = 1;
   {
     RWLock::WLocker l(ictx->owner_lock);
     ictx->image_watcher->request_lock(
diff --git a/src/test/librbd/test_librbd.cc b/src/test/librbd/test_librbd.cc
index 4ccb04f37ef..a23fadc3c89 100644
--- a/src/test/librbd/test_librbd.cc
+++ b/src/test/librbd/test_librbd.cc
@@ -3049,3 +3049,58 @@ TEST_F(TestLibRBD, BlockingAIO)
   expected_bl.append(std::string(128, '\0'));
   ASSERT_TRUE(expected_bl.contents_equal(read_bl));
 }
+
+TEST_F(TestLibRBD, ExclusiveLockTransition)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  librbd::RBD rbd;
+  std::string name = get_temp_image_name();
+
+  uint64_t size = 1 << 18;
+  int order = 12;
+  ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+
+  librbd::Image image1;
+  ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
+
+  librbd::Image image2;
+  ASSERT_EQ(0, rbd.open(ioctx, image2, name.c_str(), NULL));
+
+  std::list<librbd::RBD::AioCompletion *> comps;
+  ceph::bufferlist bl;
+  bl.append(std::string(1 << order, '1'));
+  for (size_t object_no = 0; object_no < (size >> 12); ++object_no) {
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL,
+                                                                      NULL);
+    comps.push_back(comp);
+    if (object_no % 2 == 0) {
+      ASSERT_EQ(0, image1.aio_write(object_no << order, bl.length(), bl, comp));
+    } else {
+      ASSERT_EQ(0, image2.aio_write(object_no << order, bl.length(), bl, comp));
+    }
+  }
+
+  while (!comps.empty()) {
+    librbd::RBD::AioCompletion *comp = comps.front();
+    comps.pop_front();
+    ASSERT_EQ(0, comp->wait_for_complete());
+    ASSERT_EQ(1, comp->is_complete());
+  }
+
+  librbd::Image image3;
+  ASSERT_EQ(0, rbd.open(ioctx, image3, name.c_str(), NULL));
+  for (size_t object_no = 0; object_no < (size >> 12); ++object_no) {
+    bufferlist read_bl;
+    ASSERT_EQ(bl.length(), image3.read(object_no << order, bl.length(),
+                                       read_bl));
+    ASSERT_TRUE(bl.contents_equal(read_bl));
+  }
+
+  ASSERT_PASSED(validate_object_map, image1);
+  ASSERT_PASSED(validate_object_map, image2);
+  ASSERT_PASSED(validate_object_map, image3);
+}