rgw: push intent log processing into RGWRados layer

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
This commit is contained in:
Sage Weil 2011-10-07 20:48:56 -07:00 committed by Sage Weil
parent d7f7a21354
commit 930d57f864
4 changed files with 183 additions and 152 deletions

View File

@ -267,6 +267,12 @@ public:
/* The bucket here can either be the bucket name identifier, or the ID
* in period format: ".123" */
virtual int get_bucket_info(string& bucket, RGWBucketInfo& info) = 0;
virtual int remove_temp_objects(string date, string time) {
return 0;
}
};
class RGWStoreManager {

View File

@ -407,111 +407,6 @@ static void remove_old_indexes(RGWUserInfo& old_info, RGWUserInfo new_info)
cerr << "ERROR: this should be fixed manually!" << std::endl;
}
class IntentLogNameFilter : public RGWAccessListFilter
{
string prefix;
bool filter_exact_date;
public:
IntentLogNameFilter(const char *date, struct tm *tm) {
prefix = date;
filter_exact_date = !(tm->tm_hour || tm->tm_min || tm->tm_sec); /* if time was specified and is not 00:00:00
we should look at objects from that date */
}
bool filter(string& name, string& key) {
if (filter_exact_date)
return name.compare(prefix) < 0;
else
return name.compare(0, prefix.size(), prefix) <= 0;
}
};
enum IntentFlags { // bitmask
I_DEL_OBJ = 1,
I_DEL_POOL = 2,
};
int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge)
{
cout << "processing intent log " << oid << std::endl;
uint64_t size;
rgw_obj obj(bucket, oid);
int r = rgwstore->obj_stat(NULL, obj, &size, NULL);
if (r < 0) {
cerr << "error while doing stat on " << bucket << ":" << oid
<< " " << cpp_strerror(-r) << std::endl;
return -r;
}
bufferlist bl;
r = rgwstore->read(NULL, obj, 0, size, bl);
if (r < 0) {
cerr << "error while reading from " << bucket << ":" << oid
<< " " << cpp_strerror(-r) << std::endl;
return -r;
}
bufferlist::iterator iter = bl.begin();
string id;
bool complete = true;
try {
while (!iter.end()) {
struct rgw_intent_log_entry entry;
try {
::decode(entry, iter);
} catch (buffer::error& err) {
dout(0) << "ERROR: " << __func__ << "(): caught buffer::error" << dendl;
return -EIO;
}
if (entry.op_time.sec() > epoch) {
cerr << "skipping entry for obj=" << obj << " entry.op_time=" << entry.op_time.sec() << " requested epoch=" << epoch << std::endl;
cerr << "skipping intent log" << std::endl; // no use to continue
complete = false;
break;
}
switch (entry.intent) {
case DEL_OBJ:
if (!flags & I_DEL_OBJ) {
complete = false;
break;
}
r = rgwstore->delete_obj(NULL, id, entry.obj);
if (r < 0 && r != -ENOENT) {
cerr << "failed to remove obj: " << entry.obj << std::endl;
complete = false;
}
break;
case DEL_POOL:
if (!flags & I_DEL_POOL) {
complete = false;
break;
}
r = rgwstore->delete_bucket(id, entry.obj.bucket, true);
if (r < 0 && r != -ENOENT) {
cerr << "failed to remove pool: " << entry.obj.bucket.pool << std::endl;
complete = false;
}
break;
default:
complete = false;
}
}
} catch (buffer::error& err) {
cerr << "failed to decode intent log entry in " << bucket << ":" << oid << std::endl;
complete = false;
}
if (complete) {
rgw_obj obj(bucket, oid);
cout << "completed intent log: " << obj << (purge ? ", purging it" : "") << std::endl;
if (purge) {
r = rgwstore->delete_obj(NULL, id, obj);
if (r < 0)
cerr << "failed to remove obj: " << obj << std::endl;
}
}
return 0;
}
int bucket_stats(rgw_bucket& bucket, Formatter *formatter)
{
RGWBucketInfo bucket_info;
@ -1040,54 +935,11 @@ int main(int argc, char **argv)
cerr << "date wasn't specified" << std::endl;
return usage();
}
struct tm tm;
string format = "%Y-%m-%d";
string datetime = date;
if (datetime.size() != 10) {
cerr << "bad date format" << std::endl;
return -EINVAL;
int r = store->remove_temp_objects(date, time);
if (r < 0) {
cerr << "failure removing temp objects: " << cpp_strerror(r) << std::endl;
return 1;
}
if (!time.empty()) {
if (time.size() != 5 && time.size() != 8) {
cerr << "bad time format" << std::endl;
return -EINVAL;
}
format.append(" %H:%M:%S");
datetime.append(time.c_str());
}
const char *s = strptime(datetime.c_str(), format.c_str(), &tm);
if (s && *s) {
cerr << "failed to parse date/time" << std::endl;
return -EINVAL;
}
time_t epoch = mktime(&tm);
rgw_bucket bucket(RGW_INTENT_LOG_POOL_NAME);
string prefix, delim, marker;
vector<RGWObjEnt> objs;
map<string, bool> common_prefixes;
string ns;
string id;
int max = 1000;
bool is_truncated;
IntentLogNameFilter filter(date.c_str(), &tm);
do {
int r = store->list_objects(id, bucket, max, prefix, delim, marker,
objs, common_prefixes, false, ns,
&is_truncated, &filter);
if (r == -ENOENT)
break;
if (r < 0) {
cerr << "failed to list objects" << std::endl;
}
vector<RGWObjEnt>::iterator iter;
for (iter = objs.begin(); iter != objs.end(); ++iter) {
process_intent_log(bucket, (*iter).name, epoch, I_DEL_OBJ | I_DEL_POOL, true);
}
} while (is_truncated);
}
if (opt_cmd == OPT_LOG_LIST) {

View File

@ -2,6 +2,8 @@
#include <stdlib.h>
#include <sys/types.h>
#include "common/errno.h"
#include "rgw_access.h"
#include "rgw_rados.h"
#include "rgw_acl.h"
@ -22,6 +24,8 @@ using namespace librados;
#include <map>
#include "auth/Crypto.h" // get_random_bytes()
#include "rgw_log.h"
#define DOUT_SUBSYS rgw
using namespace std;
@ -2118,3 +2122,164 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header&
return 0;
}
class IntentLogNameFilter : public RGWAccessListFilter
{
string prefix;
bool filter_exact_date;
public:
IntentLogNameFilter(const char *date, struct tm *tm) {
prefix = date;
filter_exact_date = !(tm->tm_hour || tm->tm_min || tm->tm_sec); /* if time was specified and is not 00:00:00
we should look at objects from that date */
}
bool filter(string& name, string& key) {
if (filter_exact_date)
return name.compare(prefix) < 0;
else
return name.compare(0, prefix.size(), prefix) <= 0;
}
};
enum IntentFlags { // bitmask
I_DEL_OBJ = 1,
I_DEL_POOL = 2,
};
int RGWRados::remove_temp_objects(string date, string time)
{
struct tm tm;
string format = "%Y-%m-%d";
string datetime = date;
if (datetime.size() != 10) {
cerr << "bad date format" << std::endl;
return -EINVAL;
}
if (!time.empty()) {
if (time.size() != 5 && time.size() != 8) {
cerr << "bad time format" << std::endl;
return -EINVAL;
}
format.append(" %H:%M:%S");
datetime.append(time.c_str());
}
const char *s = strptime(datetime.c_str(), format.c_str(), &tm);
if (s && *s) {
cerr << "failed to parse date/time" << std::endl;
return -EINVAL;
}
time_t epoch = mktime(&tm);
rgw_bucket bucket(RGW_INTENT_LOG_POOL_NAME);
string prefix, delim, marker;
vector<RGWObjEnt> objs;
map<string, bool> common_prefixes;
string ns;
string id;
int max = 1000;
bool is_truncated;
IntentLogNameFilter filter(date.c_str(), &tm);
do {
int r = store->list_objects(id, bucket, max, prefix, delim, marker,
objs, common_prefixes, false, ns,
&is_truncated, &filter);
if (r == -ENOENT)
break;
if (r < 0) {
cerr << "failed to list objects" << std::endl;
}
vector<RGWObjEnt>::iterator iter;
for (iter = objs.begin(); iter != objs.end(); ++iter) {
process_intent_log(bucket, (*iter).name, epoch, I_DEL_OBJ | I_DEL_POOL, true);
}
} while (is_truncated);
return 0;
}
int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge)
{
cout << "processing intent log " << oid << std::endl;
uint64_t size;
rgw_obj obj(bucket, oid);
int r = obj_stat(NULL, obj, &size, NULL);
if (r < 0) {
cerr << "error while doing stat on " << bucket << ":" << oid
<< " " << cpp_strerror(-r) << std::endl;
return -r;
}
bufferlist bl;
r = read(NULL, obj, 0, size, bl);
if (r < 0) {
cerr << "error while reading from " << bucket << ":" << oid
<< " " << cpp_strerror(-r) << std::endl;
return -r;
}
bufferlist::iterator iter = bl.begin();
string id;
bool complete = true;
try {
while (!iter.end()) {
struct rgw_intent_log_entry entry;
try {
::decode(entry, iter);
} catch (buffer::error& err) {
dout(0) << "ERROR: " << __func__ << "(): caught buffer::error" << dendl;
return -EIO;
}
if (entry.op_time.sec() > epoch) {
cerr << "skipping entry for obj=" << obj << " entry.op_time=" << entry.op_time.sec() << " requested epoch=" << epoch << std::endl;
cerr << "skipping intent log" << std::endl; // no use to continue
complete = false;
break;
}
switch (entry.intent) {
case DEL_OBJ:
if (!flags & I_DEL_OBJ) {
complete = false;
break;
}
r = rgwstore->delete_obj(NULL, id, entry.obj);
if (r < 0 && r != -ENOENT) {
cerr << "failed to remove obj: " << entry.obj << std::endl;
complete = false;
}
break;
case DEL_POOL:
if (!flags & I_DEL_POOL) {
complete = false;
break;
}
r = delete_bucket(id, entry.obj.bucket, true);
if (r < 0 && r != -ENOENT) {
cerr << "failed to remove pool: " << entry.obj.bucket.pool << std::endl;
complete = false;
}
break;
default:
complete = false;
}
}
} catch (buffer::error& err) {
cerr << "failed to decode intent log entry in " << bucket << ":" << oid << std::endl;
complete = false;
}
if (complete) {
rgw_obj obj(bucket, oid);
cout << "completed intent log: " << obj << (purge ? ", purging it" : "") << std::endl;
if (purge) {
r = delete_obj(NULL, id, obj, true);
if (r < 0)
cerr << "failed to remove obj: " << obj << std::endl;
}
}
return 0;
}

View File

@ -305,6 +305,14 @@ public:
int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch) {
return cls_obj_complete_del(bucket, tag, epoch, oid);
}
/// clean up/process any temporary objects older than given date[/time]
int remove_temp_objects(string date, string time);
private:
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);
};
#endif