client: set osdmap epoch for setxattr.

Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
This commit is contained in:
Jianpeng Ma 2015-08-31 22:00:53 +08:00
parent 90dea96f70
commit 13668e6848
4 changed files with 87 additions and 25 deletions

View File

@ -24,6 +24,9 @@
#include <sys/utsname.h>
#include <sys/uio.h>
#include <boost/lexical_cast.hpp>
#include <boost/fusion/include/std_pair.hpp>
#if defined(__linux__)
#include <linux/falloc.h>
#endif
@ -1913,6 +1916,11 @@ void Client::send_request(MetaRequest *request, MetaSession *session,
r->releases.swap(request->cap_releases);
}
r->set_mdsmap_epoch(mdsmap->get_epoch());
if (r->head.op == CEPH_MDS_OP_SETXATTR) {
const OSDMap *osdmap = objecter->get_osdmap_read();
r->set_osdmap_epoch(osdmap->get_epoch());
objecter->put_osdmap_read();
}
if (request->mds == -1) {
request->sent_stamp = ceph_clock_now(cct);
@ -8985,9 +8993,67 @@ int Client::_setxattr(Inode *in, const char *name, const void *value,
return res;
}
int Client::check_data_pool_exist(string name, string value, const OSDMap *osdmap)
{
string tmp;
if (name == "layout") {
string::iterator begin = value.begin();
string::iterator end = value.end();
keys_and_values<string::iterator> p; // create instance of parser
std::map<string, string> m; // map to receive results
if (!qi::parse(begin, end, p, m)) { // returns true if successful
return -EINVAL;
}
if (begin != end)
return -EINVAL;
for (map<string,string>::iterator q = m.begin(); q != m.end(); ++q) {
if (q->first == "pool") {
tmp = q->second;
break;
}
}
} else if (name == "layout.pool") {
tmp = value;
}
if (tmp.length()) {
int64_t pool;
try {
pool = boost::lexical_cast<unsigned>(tmp);
if (!osdmap->have_pg_pool(pool))
return -ENOENT;
} catch (boost::bad_lexical_cast const&) {
pool = osdmap->lookup_pg_pool_name(tmp);
if (pool < 0) {
return -ENOENT;
}
}
}
return 0;
}
int Client::ll_setxattr(Inode *in, const char *name, const void *value,
size_t size, int flags, int uid, int gid)
{
// For setting pool of layout, MetaRequest need osdmap epoch.
// There is a race which create a new data pool but client and mds both don't have.
// Make client got the latest osdmap which make mds quickly judge whether get newer osdmap.
if (strcmp(name, "ceph.file.layout.pool") == 0 || strcmp(name, "ceph.dir.layout.pool") == 0 ||
strcmp(name, "ceph.file.layout") == 0 || strcmp(name, "ceph.dir.layout") == 0) {
string rest(strstr(name, "layout"));
string v((const char*)value);
const OSDMap *osdmap = objecter->get_osdmap_read();
int r = check_data_pool_exist(rest, v, osdmap);
objecter->put_osdmap_read();
if (r == -ENOENT) {
C_SaferCond ctx;
objecter->wait_for_latest_osdmap(&ctx);
ctx.wait();
}
}
Mutex::Locker lock(client_lock);
vinodeno_t vino = _get_vino(in);

View File

@ -705,6 +705,8 @@ private:
int check_permissions(Inode *in, int flags, int uid, int gid);
int check_data_pool_exist(string name, string value, const OSDMap *osdmap);
vinodeno_t _get_vino(Inode *in);
inodeno_t _get_inodeno(Inode *in);

View File

@ -16,7 +16,6 @@
#include "include/assert.h" // lexical_cast includes system assert.h
#include <boost/config/warning_disable.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/fusion/include/std_pair.hpp>
#include "MDSRank.h"
@ -3850,31 +3849,8 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
journal_and_reply(mdr, cur, 0, le, new C_MDS_inode_update_finish(mds, mdr, cur));
}
// XATTRS
// parse a map of keys/values.
namespace qi = boost::spirit::qi;
template <typename Iterator>
struct keys_and_values
: qi::grammar<Iterator, std::map<string, string>()>
{
keys_and_values()
: keys_and_values::base_type(query)
{
query = pair >> *(qi::lit(' ') >> pair);
pair = key >> '=' >> value;
key = qi::char_("a-zA-Z_") >> *qi::char_("a-zA-Z_0-9");
value = +qi::char_("a-zA-Z_0-9");
}
qi::rule<Iterator, std::map<string, string>()> query;
qi::rule<Iterator, std::pair<string, string>()> pair;
qi::rule<Iterator, string()> key, value;
};
int Server::parse_layout_vxattr(string name, string value, const OSDMap *osdmap,
ceph_file_layout *layout, bool validate)
{

View File

@ -23,11 +23,11 @@
#include "inode_backtrace.h"
#include <boost/spirit/include/qi.hpp>
#include <boost/pool/pool.hpp>
#include "include/assert.h"
#include <boost/serialization/strong_typedef.hpp>
#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v011"
@ -1604,6 +1604,24 @@ public:
void dump(Formatter *f) const;
};
// parse a map of keys/values.
namespace qi = boost::spirit::qi;
template <typename Iterator>
struct keys_and_values
: qi::grammar<Iterator, std::map<string, string>()>
{
keys_and_values()
: keys_and_values::base_type(query)
{
query = pair >> *(qi::lit(' ') >> pair);
pair = key >> '=' >> value;
key = qi::char_("a-zA-Z_") >> *qi::char_("a-zA-Z_0-9");
value = +qi::char_("a-zA-Z_0-9");
}
qi::rule<Iterator, std::map<string, string>()> query;
qi::rule<Iterator, std::pair<string, string>()> pair;
qi::rule<Iterator, string()> key, value;
};
#endif