diff --git a/src/ceph-disk b/src/ceph-disk index 0389b5ce55b..13d9f8203ce 100755 --- a/src/ceph-disk +++ b/src/ceph-disk @@ -761,14 +761,6 @@ def prepare_journal_dev( # wait for udev event queue to clear 'udevadm', 'settle', - '--timeout=10', - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - journal, ], ) @@ -963,14 +955,6 @@ def prepare_dev( # wait for udev event queue to clear 'udevadm', 'settle', - '--timeout=10', - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - data, ], ) except subprocess.CalledProcessError as e: @@ -1037,21 +1021,6 @@ def prepare_dev( data, ], ) - subprocess.call( - args=[ - # wait for udev event queue to clear - 'udevadm', - 'settle', - '--timeout=10', - ], - ) - subprocess.check_call( - args=[ - # also make sure the kernel refreshes the new table - 'partprobe', - data, - ], - ) except subprocess.CalledProcessError as e: raise Error(e) @@ -1192,6 +1161,19 @@ def main_prepare(args): raise Error('not a dir or block device', args.data) prepare_lock.release() + if stat.S_ISBLK(dmode): + # try to make sure the kernel refreshes the table. note + # that if this gets ebusy, we are probably racing with + # udev because it already updated it.. ignore failure here. + LOG.debug('Calling partprobe on prepared device %s', args.data) + subprocess.call( + args=[ + 'partprobe', + args.data, + ], + ) + + except Error as e: if journal_dm_keypath: os.unlink(journal_dm_keypath) @@ -1624,6 +1606,64 @@ def main_activate(args): activate_lock.release() +########################### + +def get_journal_osd_uuid(path): + if not os.path.exists(path): + raise Error('%s does not exist', path) + + mode = os.stat(path).st_mode + if not stat.S_ISBLK(mode): + raise Error('%s is not a block device', path) + + try: + out = _check_output( + args=[ + 'ceph-osd', + '-i', '0', # this is ignored + '--get-journal-uuid', + '--osd-journal', + path, + ], + close_fds=True, + ) + except subprocess.CalledProcessError as e: + raise Error( + 'failed to get osd uuid/fsid from journal', + e, + ) + value = str(out).split('\n', 1)[0] + LOG.debug('Journal %s has OSD UUID %s', path, value) + return value + +def main_activate_journal(args): + if not os.path.exists(args.dev): + raise Error('%s does not exist', args.dev) + + cluster = None + osd_id = None + osd_uuid = None + activate_lock.acquire() + try: + osd_uuid = get_journal_osd_uuid(args.dev) + path = os.path.join('/dev/disk/by-partuuid/', osd_uuid.lower()) + + (cluster, osd_id) = mount_activate( + dev=path, + activate_key_template=args.activate_key_template, + init=args.mark_init, + ) + + start_daemon( + cluster=cluster, + osd_id=osd_id, + ) + + activate_lock.release() + + except: + activate_lock.release() + raise ########################### @@ -1869,6 +1909,9 @@ def main_suppress(args): def main_unsuppress(args): unset_suppress(args.path) +def main_zap(args): + for dev in args.dev: + zap(dev) ########################### @@ -2001,6 +2044,30 @@ def parse_args(): func=main_activate, ) + activate_journal_parser = subparsers.add_parser('activate-journal', help='Activate an OSD via its journal device') + activate_journal_parser.add_argument( + 'dev', + metavar='DEV', + help='path to journal block device', + ) + activate_journal_parser.add_argument( + '--activate-key', + metavar='PATH', + help='bootstrap-osd keyring path template (%(default)s)', + dest='activate_key_template', + ) + activate_journal_parser.add_argument( + '--mark-init', + metavar='INITSYSTEM', + help='init system to manage this dir', + default='auto', + choices=INIT_SYSTEMS, + ) + activate_journal_parser.set_defaults( + activate_key_template='/var/lib/ceph/bootstrap-osd/{cluster}.keyring', + func=main_activate_journal, + ) + list_parser = subparsers.add_parser('list', help='List disks, partitions, and Ceph OSDs') list_parser.set_defaults( func=main_list, @@ -2028,6 +2095,17 @@ def parse_args(): func=main_unsuppress, ) + zap_parser = subparsers.add_parser('zap', help='Zap/erase/destroy a device\'s partition table (and contents)') + zap_parser.add_argument( + 'dev', + metavar='DEV', + nargs='*', + help='path to block device', + ) + zap_parser.set_defaults( + func=main_zap, + ) + args = parser.parse_args() return args diff --git a/src/ceph.in b/src/ceph.in index 8ec01c13cb6..baaed80cd6c 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -1,4 +1,4 @@ -# +#& # Processed in Makefile to add python #! line and version variable # # @@ -839,7 +839,8 @@ def parse_cmdargs(args=None, target=''): # format our own help parser = AP(description='Ceph administration tool', add_help=False) - parser.add_argument('--completion', action='store_true') + parser.add_argument('--completion', action='store_true', + help=argparse.SUPPRESS) parser.add_argument('-h', '--help', help='request mon help', action='store_true') @@ -878,9 +879,10 @@ def parse_cmdargs(args=None, target=''): parser.add_argument('--watch-error', action='store_true', help='watch error events') - parser.add_argument('-v', action="store_true") - parser.add_argument('--verbose', action="store_true") - parser.add_argument('--concise', dest='verbose', action="store_false") + parser.add_argument('-v', action="store_true", help="display version") + parser.add_argument('--verbose', action="store_true", help="make verbose") + parser.add_argument('--concise', dest='verbose', action="store_false", + help="make less verbose") parser.add_argument('-f', '--format', choices=['json', 'json-pretty', 'xml', 'xml-pretty', 'plain'], dest='output_format') @@ -1455,6 +1457,14 @@ def main(): if '--' in childargs: childargs.remove('--') + # special deprecation warning for 'ceph tell' + # someday 'mds' will be here too + if len(childargs) >= 2 and \ + childargs[0] in ['mon', 'osd'] and \ + childargs[1] == 'tell': + print >> sys.stderr, '"{0} tell" is deprecated; try "tell {0}." instead (id can be "*") '.format(childargs[0]) + return 1 + try: cluster_handle.connect() except KeyboardInterrupt: diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index 77c70f29df0..54616f60f99 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -108,7 +108,7 @@ int main(int argc, const char **argv, const char *envp[]) { g_ceph_context->_log->start(); // get monmap - Messenger *messenger; + Messenger *messenger = NULL; Client *client; CephFuse *cfuse; diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index f5dccaffc09..f68125fb8c0 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -85,6 +85,7 @@ librados::RadosClient::RadosClient(CephContext *cct_) int64_t librados::RadosClient::lookup_pool(const char *name) { Mutex::Locker l(lock); + wait_for_osdmap(); int64_t ret = osdmap.lookup_pg_pool_name(name); if (ret < 0) return -ENOENT; @@ -100,6 +101,7 @@ const char *librados::RadosClient::get_pool_name(int64_t pool_id) int librados::RadosClient::pool_get_auid(uint64_t pool_id, unsigned long long *auid) { Mutex::Locker l(lock); + wait_for_osdmap(); const pg_pool_t *pg = osdmap.get_pg_pool(pool_id); if (!pg) return -ENOENT; @@ -110,6 +112,7 @@ int librados::RadosClient::pool_get_auid(uint64_t pool_id, unsigned long long *a int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s) { Mutex::Locker l(lock); + wait_for_osdmap(); const char *str = osdmap.get_pool_name(pool_id); if (!str) return -ENOENT; @@ -123,7 +126,7 @@ int librados::RadosClient::get_fsid(std::string *s) return -EINVAL; Mutex::Locker l(lock); ostringstream oss; - oss << osdmap.get_fsid(); + oss << monclient.get_fsid(); *s = oss.str(); return 0; } @@ -354,9 +357,21 @@ bool librados::RadosClient::_dispatch(Message *m) return true; } +void librados::RadosClient::wait_for_osdmap() +{ + assert(lock.is_locked()); + if (osdmap.get_epoch() == 0) { + ldout(cct, 10) << __func__ << " waiting" << dendl; + while (osdmap.get_epoch() == 0) + cond.Wait(lock); + ldout(cct, 10) << __func__ << " done waiting" << dendl; + } +} + int librados::RadosClient::pool_list(std::list& v) { Mutex::Locker l(lock); + wait_for_osdmap(); for (map::const_iterator p = osdmap.get_pools().begin(); p != osdmap.get_pools().end(); ++p) @@ -453,6 +468,7 @@ int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionIm int librados::RadosClient::pool_delete(const char *name) { lock.Lock(); + wait_for_osdmap(); int tmp_pool_id = osdmap.lookup_pg_pool_name(name); if (tmp_pool_id < 0) { lock.Unlock(); @@ -481,6 +497,7 @@ int librados::RadosClient::pool_delete(const char *name) int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c) { Mutex::Locker l(lock); + wait_for_osdmap(); int tmp_pool_id = osdmap.lookup_pg_pool_name(name); if (tmp_pool_id < 0) return -ENOENT; diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index 337beff5750..4f616d45331 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -70,6 +70,8 @@ private: void *log_cb_arg; string log_watch; + void wait_for_osdmap(); + public: Finisher finisher; diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 43c2584c390..0d4277d8fab 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -12,6 +12,8 @@ * */ +#include + #include "common/config.h" #include "common/errno.h" #include "common/ceph_argparse.h" diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 0ec92e2641d..4d4cdf919ff 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -111,6 +111,10 @@ enum { l_osdc_statfs_send, l_osdc_statfs_resend, + l_osdc_command_active, + l_osdc_command_send, + l_osdc_command_resend, + l_osdc_map_epoch, l_osdc_map_full, l_osdc_map_inc, @@ -190,6 +194,10 @@ void Objecter::init_unlocked() pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send"); pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend"); + pcb.add_u64(l_osdc_command_active, "command_active"); + pcb.add_u64_counter(l_osdc_command_send, "command_send"); + pcb.add_u64_counter(l_osdc_command_resend, "command_resend"); + pcb.add_u64(l_osdc_map_epoch, "map_epoch"); pcb.add_u64_counter(l_osdc_map_full, "map_full"); pcb.add_u64_counter(l_osdc_map_inc, "map_inc"); @@ -1002,6 +1010,17 @@ void Objecter::kick_requests(OSDSession *session) send_linger(lresend.begin()->second); lresend.erase(lresend.begin()); } + + // resend commands + map cresend; // resend in order + for (xlist::iterator k = session->command_ops.begin(); !k.end(); ++k) { + logger->inc(l_osdc_command_resend); + cresend[(*k)->tid] = *k; + } + while (!cresend.empty()) { + _send_command(cresend.begin()->second); + cresend.erase(cresend.begin()); + } } void Objecter::schedule_tick() @@ -1049,6 +1068,17 @@ void Objecter::tick() ldout(cct, 10) << " lingering tid " << p->first << " does not have session" << dendl; } } + for (map::iterator p = command_ops.begin(); + p != command_ops.end(); + ++p) { + CommandOp *op = p->second; + if (op->session) { + ldout(cct, 10) << " pinging osd that serves command tid " << p->first << " (osd." << op->session->osd << ")" << dendl; + toping.insert(op->session); + } else { + ldout(cct, 10) << " command tid " << p->first << " does not have session" << dendl; + } + } logger->set(l_osdc_op_laggy, laggy_ops); logger->set(l_osdc_osd_laggy, toping.size()); @@ -2153,6 +2183,7 @@ void Objecter::dump_requests(Formatter& fmt) const dump_pool_ops(fmt); dump_pool_stat_ops(fmt); dump_statfs_ops(fmt); + dump_command_ops(fmt); fmt.close_section(); // requests object } @@ -2209,6 +2240,29 @@ void Objecter::dump_linger_ops(Formatter& fmt) const fmt.close_section(); // linger_ops array } +void Objecter::dump_command_ops(Formatter& fmt) const +{ + fmt.open_array_section("command_ops"); + for (map::const_iterator p = command_ops.begin(); + p != command_ops.end(); + ++p) { + CommandOp *op = p->second; + fmt.open_object_section("command_op"); + fmt.dump_unsigned("command_id", op->tid); + fmt.dump_int("osd", op->session ? op->session->osd : -1); + fmt.open_array_section("command"); + for (vector::const_iterator q = op->cmd.begin(); q != op->cmd.end(); ++q) + fmt.dump_string("word", *q); + fmt.close_section(); + if (op->target_osd >= 0) + fmt.dump_int("target_osd", op->target_osd); + else + fmt.dump_stream("target_pg") << op->target_pg; + fmt.close_section(); // command_op object + } + fmt.close_section(); // command_ops array +} + void Objecter::dump_pool_ops(Formatter& fmt) const { fmt.open_array_section("pool_ops"); @@ -2347,6 +2401,8 @@ int Objecter::_submit_command(CommandOp *c, tid_t *ptid) if (c->map_check_error) _send_command_map_check(c); *ptid = tid; + + logger->set(l_osdc_command_active, command_ops.size()); return 0; } @@ -2403,6 +2459,7 @@ void Objecter::_send_command(CommandOp *c) m->set_data(c->inbl); m->set_tid(c->tid); messenger->send_message(m, c->session->con); + logger->inc(l_osdc_command_send); } void Objecter::_finish_command(CommandOp *c, int r, string rs) @@ -2415,4 +2472,6 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs) c->onfinish->complete(r); command_ops.erase(c->tid); c->put(); + + logger->set(l_osdc_command_active, command_ops.size()); } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 850d974472e..bc4b0ae21c1 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1226,6 +1226,7 @@ private: void dump_requests(Formatter& fmt) const; void dump_ops(Formatter& fmt) const; void dump_linger_ops(Formatter& fmt) const; + void dump_command_ops(Formatter& fmt) const; void dump_pool_ops(Formatter& fmt) const; void dump_pool_stat_ops(Formatter& fmt) const; void dump_statfs_ops(Formatter& fmt) const; diff --git a/udev/95-ceph-osd.rules b/udev/95-ceph-osd.rules index 77e6ef37c5d..9798e648483 100644 --- a/udev/95-ceph-osd.rules +++ b/udev/95-ceph-osd.rules @@ -4,6 +4,12 @@ ACTION=="add", SUBSYSTEM=="block", \ ENV{ID_PART_ENTRY_TYPE}=="4fbd7e29-9d25-41b8-afd0-062c0ceff05d", \ RUN+="/usr/sbin/ceph-disk-activate --mount /dev/$name" +# activate ceph-tagged partitions +ACTION=="add", SUBSYSTEM=="block", \ + ENV{DEVTYPE}=="partition", \ + ENV{ID_PART_ENTRY_TYPE}=="45b0969e-9b03-4f30-b4c6-b4b80ceff106", \ + RUN+="/usr/sbin/ceph-disk activate-journal /dev/$name" + # Map journal if using dm-crypt ACTION=="add" SUBSYSTEM=="block", \ ENV{DEVTYPE}=="partition", \