Merge pull request #16834 from mdw-at-linuxbox/policy

radosgw: usage: fix bytes_sent bug.
This commit is contained in:
Matt Benjamin 2017-08-09 14:24:01 -04:00 committed by GitHub
commit 0956b3aafd
8 changed files with 513 additions and 138 deletions

View File

@ -6,18 +6,26 @@ Rgw admin testing against a running instance
#
# grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //'
#
# to run this standalone:
# python qa/tasks/radosgw_admin.py [USER] HOSTNAME
#
import copy
import json
import logging
import time
import datetime
import Queue
import bunch
import sys
from cStringIO import StringIO
import boto.exception
import boto.s3.connection
import boto.s3.acl
from boto.utils import RequestHook
import httplib2
@ -27,6 +35,197 @@ from util.rgw import rgwadmin, get_user_summary, get_user_successful_ops
log = logging.getLogger(__name__)
def usage_acc_findentry2(entries, user, add=True):
for e in entries:
if e['user'] == user:
return e
if not add:
return None
e = {'user': user, 'buckets': []}
entries.append(e)
return e
def usage_acc_findsum2(summaries, user, add=True):
for e in summaries:
if e['user'] == user:
return e
if not add:
return None
e = {'user': user, 'categories': [],
'total': {'bytes_received': 0,
'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }}
summaries.append(e)
return e
def usage_acc_update2(x, out, b_in, err):
x['bytes_sent'] += b_in
x['bytes_received'] += out
x['ops'] += 1
if not err:
x['successful_ops'] += 1
def usage_acc_validate_fields(r, x, x2, what):
q=[]
for field in ['bytes_sent', 'bytes_received', 'ops', 'successful_ops']:
try:
if x2[field] < x[field]:
q.append("field %s: %d < %d" % (field, x2[field], x[field]))
except Exception as ex:
r.append( "missing/bad field " + field + " in " + what + " " + str(ex))
return
if len(q) > 0:
r.append("incomplete counts in " + what + ": " + ", ".join(q))
class usage_acc:
def __init__(self):
self.results = {'entries': [], 'summary': []}
def findentry(self, user):
return usage_acc_findentry2(self.results['entries'], user)
def findsum(self, user):
return usage_acc_findsum2(self.results['summary'], user)
def e2b(self, e, bucket, add=True):
for b in e['buckets']:
if b['bucket'] == bucket:
return b
if not add:
return None
b = {'bucket': bucket, 'categories': []}
e['buckets'].append(b)
return b
def c2x(self, c, cat, add=True):
for x in c:
if x['category'] == cat:
return x
if not add:
return None
x = {'bytes_received': 0, 'category': cat,
'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }
c.append(x)
return x
def update(self, c, cat, user, out, b_in, err):
x = self.c2x(c, cat)
usage_acc_update2(x, out, b_in, err)
if not err and cat == 'create_bucket' and not x.has_key('owner'):
x['owner'] = user
def make_entry(self, cat, bucket, user, out, b_in, err):
if cat == 'create_bucket' and err:
return
e = self.findentry(user)
b = self.e2b(e, bucket)
self.update(b['categories'], cat, user, out, b_in, err)
s = self.findsum(user)
x = self.c2x(s['categories'], cat)
usage_acc_update2(x, out, b_in, err)
x = s['total']
usage_acc_update2(x, out, b_in, err)
def generate_make_entry(self):
return lambda cat,bucket,user,out,b_in,err: self.make_entry(cat, bucket, user, out, b_in, err)
def get_usage(self):
return self.results
def compare_results(self, results):
if not results.has_key('entries') or not results.has_key('summary'):
return ['Missing entries or summary']
r = []
for e in self.results['entries']:
try:
e2 = usage_acc_findentry2(results['entries'], e['user'], False)
except Exception as ex:
r.append("malformed entry looking for user "
+ e['user'] + " " + str(ex))
break
if e2 == None:
r.append("missing entry for user " + e['user'])
continue
for b in e['buckets']:
c = b['categories']
if b['bucket'] == 'nosuchbucket':
print "got here"
try:
b2 = self.e2b(e2, b['bucket'], False)
if b2 != None:
c2 = b2['categories']
except Exception as ex:
r.append("malformed entry looking for bucket "
+ b['bucket'] + " in user " + e['user'] + " " + str(ex))
break
if b2 == None:
r.append("can't find bucket " + b['bucket']
+ " in user " + e['user'])
continue
for x in c:
try:
x2 = self.c2x(c2, x['category'], False)
except Exception as ex:
r.append("malformed entry looking for "
+ x['category'] + " in bucket " + b['bucket']
+ " user " + e['user'] + " " + str(ex))
break
usage_acc_validate_fields(r, x, x2, "entry: category "
+ x['category'] + " bucket " + b['bucket']
+ " in user " + e['user'])
for s in self.results['summary']:
c = s['categories']
try:
s2 = usage_acc_findsum2(results['summary'], s['user'], False)
except Exception as ex:
r.append("malformed summary looking for user " + e['user']
+ " " + str(ex))
break
if s2 == None:
r.append("missing summary for user " + e['user'] + " " + str(ex))
continue
try:
c2 = s2['categories']
except Exception as ex:
r.append("malformed summary missing categories for user "
+ e['user'] + " " + str(ex))
break
for x in c:
try:
x2 = self.c2x(c2, x['category'], False)
except Exception as ex:
r.append("malformed summary looking for "
+ x['category'] + " user " + e['user'] + " " + str(ex))
break
usage_acc_validate_fields(r, x, x2, "summary: category "
+ x['category'] + " in user " + e['user'])
x = s['total']
try:
x2 = s2['total']
except Exception as ex:
r.append("malformed summary looking for totals for user "
+ e['user'] + " " + str(ex))
break
usage_acc_validate_fields(r, x, x2, "summary: totals for user" + e['user'])
return r
def ignore_this_entry(cat, bucket, user, out, b_in, err):
pass
class requestlog_queue():
def __init__(self, add):
self.q = Queue.Queue(1000)
self.adder = add
def handle_request_data(self, request, response, error=False):
now = datetime.datetime.now()
if error:
pass
elif response.status < 200 or response.status >= 400:
error = True
self.q.put(bunch.Bunch({'t': now, 'o': request, 'i': response, 'e': error}))
def clear(self):
with self.q.mutex:
self.q.queue.clear()
def log_and_clear(self, cat, bucket, user, add_entry = None):
while not self.q.empty():
j = self.q.get()
bytes_out = 0
if 'Content-Length' in j.o.headers:
bytes_out = int(j.o.headers['Content-Length'])
bytes_in = 0
if 'content-length' in j.i.msg.dict:
bytes_in = int(j.i.msg.dict['content-length'])
log.info('RL: %s %s %s bytes_out=%d bytes_in=%d failed=%r'
% (cat, bucket, user, bytes_out, bytes_in, j.e))
if add_entry == None:
add_entry = self.adder
add_entry(cat, bucket, user, bytes_out, bytes_in, j.e)
def create_presigned_url(conn, method, bucket_name, key_name, expiration):
return conn.generate_url(expires_in=expiration,
method=method,
@ -119,8 +318,17 @@ def task(ctx, config):
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
)
acc = usage_acc()
rl = requestlog_queue(acc.generate_make_entry())
connection.set_request_hook(rl)
connection2.set_request_hook(rl)
# legend (test cases can be easily grep-ed out)
# TESTCASE 'testname','object','method','operation','assertion'
# TESTCASE 'usage-show0' 'usage' 'show' 'all usage' 'succeeds'
(err, summary0) = rgwadmin(ctx, client, ['usage', 'show'], check_status=True)
# TESTCASE 'info-nosuch','user','info','non-existent user','fails'
(err, out) = rgwadmin(ctx, client, ['user', 'info', '--uid', user1])
assert err
@ -266,11 +474,19 @@ def task(ctx, config):
# create a first bucket
bucket = connection.create_bucket(bucket_name)
rl.log_and_clear("create_bucket", bucket_name, user1)
# TESTCASE 'bucket-list','bucket','list','one bucket','succeeds, expected list'
(err, out) = rgwadmin(ctx, client, ['bucket', 'list', '--uid', user1], check_status=True)
assert len(out) == 1
assert out[0] == bucket_name
bucket_list = connection.get_all_buckets()
assert len(bucket_list) == 1
assert bucket_list[0].name == bucket_name
rl.log_and_clear("list_buckets", '', user1)
# TESTCASE 'bucket-list-all','bucket','list','all buckets','succeeds, expected list'
(err, out) = rgwadmin(ctx, client, ['bucket', 'list'], check_status=True)
assert len(out) >= 1
@ -278,8 +494,11 @@ def task(ctx, config):
# TESTCASE 'max-bucket-limit,'bucket','create','4 buckets','5th bucket fails due to max buckets == 4'
bucket2 = connection.create_bucket(bucket_name + '2')
rl.log_and_clear("create_bucket", bucket_name + '2', user1)
bucket3 = connection.create_bucket(bucket_name + '3')
rl.log_and_clear("create_bucket", bucket_name + '3', user1)
bucket4 = connection.create_bucket(bucket_name + '4')
rl.log_and_clear("create_bucket", bucket_name + '4', user1)
# the 5th should fail.
failed = False
try:
@ -287,11 +506,15 @@ def task(ctx, config):
except Exception:
failed = True
assert failed
rl.log_and_clear("create_bucket", bucket_name + '5', user1)
# delete the buckets
bucket2.delete()
rl.log_and_clear("delete_bucket", bucket_name + '2', user1)
bucket3.delete()
rl.log_and_clear("delete_bucket", bucket_name + '3', user1)
bucket4.delete()
rl.log_and_clear("delete_bucket", bucket_name + '4', user1)
# TESTCASE 'bucket-stats3','bucket','stats','new empty bucket','succeeds, empty list'
(err, out) = rgwadmin(ctx, client, [
@ -307,6 +530,7 @@ def task(ctx, config):
# use some space
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('one')
rl.log_and_clear("put_obj", bucket_name, user1)
# TESTCASE 'bucket-stats5','bucket','stats','after creating key','succeeds, lists one non-empty object'
(err, out) = rgwadmin(ctx, client, [
@ -317,6 +541,7 @@ def task(ctx, config):
# reclaim it
key.delete()
rl.log_and_clear("delete_obj", bucket_name, user1)
# TESTCASE 'bucket unlink', 'bucket', 'unlink', 'unlink bucket from user', 'fails', 'access denied error'
(err, out) = rgwadmin(ctx, client,
@ -344,9 +569,11 @@ def task(ctx, config):
denied = True
assert not denied
rl.log_and_clear("put_obj", bucket_name, user1)
# delete the object
key.delete()
rl.log_and_clear("delete_obj", bucket_name, user1)
# link the bucket to another user
(err, out) = rgwadmin(ctx, client, ['metadata', 'get', 'bucket:{n}'.format(n=bucket_name)],
@ -383,6 +610,17 @@ def task(ctx, config):
object_name = 'four'
key = boto.s3.key.Key(bucket, object_name)
key.set_contents_from_string(object_name)
rl.log_and_clear("put_obj", bucket_name, user1)
# fetch it too (for usage stats presently)
s = key.get_contents_as_string()
rl.log_and_clear("get_obj", bucket_name, user1)
assert s == object_name
# list bucket too (for usage stats presently)
keys = list(bucket.list())
rl.log_and_clear("list_bucket", bucket_name, user1)
assert len(keys) == 1
assert keys[0].name == object_name
# now delete it
(err, out) = rgwadmin(ctx, client,
@ -428,10 +666,155 @@ def task(ctx, config):
# TODO: show log by bucket+date
# TESTCASE 'user-suspend2','user','suspend','existing user','succeeds'
(err, out) = rgwadmin(ctx, client, ['user', 'suspend', '--uid', user1],
check_status=True)
# TESTCASE 'user-suspend3','user','suspend','suspended user','cannot write objects'
denied = False
try:
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('five')
except boto.exception.S3ResponseError as e:
denied = True
assert e.status == 403
assert denied
rl.log_and_clear("put_obj", bucket_name, user1)
# TESTCASE 'user-renable2','user','enable','suspended user','succeeds'
(err, out) = rgwadmin(ctx, client, ['user', 'enable', '--uid', user1],
check_status=True)
# TESTCASE 'user-renable3','user','enable','reenabled user','can write objects'
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('six')
rl.log_and_clear("put_obj", bucket_name, user1)
# TESTCASE 'gc-list', 'gc', 'list', 'get list of objects ready for garbage collection'
# create an object large enough to be split into multiple parts
test_string = 'foo'*10000000
big_key = boto.s3.key.Key(bucket)
big_key.set_contents_from_string(test_string)
rl.log_and_clear("put_obj", bucket_name, user1)
# now delete the head
big_key.delete()
rl.log_and_clear("delete_obj", bucket_name, user1)
# wait a bit to give the garbage collector time to cycle
time.sleep(15)
(err, out) = rgwadmin(ctx, client, ['gc', 'list'])
assert len(out) > 0
# TESTCASE 'gc-process', 'gc', 'process', 'manually collect garbage'
(err, out) = rgwadmin(ctx, client, ['gc', 'process'], check_status=True)
#confirm
(err, out) = rgwadmin(ctx, client, ['gc', 'list'])
assert len(out) == 0
# TESTCASE 'rm-user-buckets','user','rm','existing user','fails, still has buckets'
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1])
assert err
# delete should fail because ``key`` still exists
try:
bucket.delete()
except boto.exception.S3ResponseError as e:
assert e.status == 409
rl.log_and_clear("delete_bucket", bucket_name, user1)
key.delete()
rl.log_and_clear("delete_obj", bucket_name, user1)
bucket.delete()
rl.log_and_clear("delete_bucket", bucket_name, user1)
# TESTCASE 'policy', 'bucket', 'policy', 'get bucket policy', 'returns S3 policy'
bucket = connection.create_bucket(bucket_name)
rl.log_and_clear("create_bucket", bucket_name, user1)
# create an object
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('seven')
rl.log_and_clear("put_obj", bucket_name, user1)
# should be private already but guarantee it
key.set_acl('private')
rl.log_and_clear("put_acls", bucket_name, user1)
(err, out) = rgwadmin(ctx, client,
['policy', '--bucket', bucket.name, '--object', key.key],
check_status=True, format='xml')
acl = get_acl(key)
rl.log_and_clear("get_acls", bucket_name, user1)
assert acl == out.strip('\n')
# add another grantee by making the object public read
key.set_acl('public-read')
rl.log_and_clear("put_acls", bucket_name, user1)
(err, out) = rgwadmin(ctx, client,
['policy', '--bucket', bucket.name, '--object', key.key],
check_status=True, format='xml')
acl = get_acl(key)
rl.log_and_clear("get_acls", bucket_name, user1)
assert acl == out.strip('\n')
# TESTCASE 'rm-bucket', 'bucket', 'rm', 'bucket with objects', 'succeeds'
bucket = connection.create_bucket(bucket_name)
rl.log_and_clear("create_bucket", bucket_name, user1)
key_name = ['eight', 'nine', 'ten', 'eleven']
for i in range(4):
key = boto.s3.key.Key(bucket)
key.set_contents_from_string(key_name[i])
rl.log_and_clear("put_obj", bucket_name, user1)
(err, out) = rgwadmin(ctx, client,
['bucket', 'rm', '--bucket', bucket_name, '--purge-objects'],
check_status=True)
# TESTCASE 'caps-add', 'caps', 'add', 'add user cap', 'succeeds'
caps='user=read'
(err, out) = rgwadmin(ctx, client, ['caps', 'add', '--uid', user1, '--caps', caps])
assert out['caps'][0]['perm'] == 'read'
# TESTCASE 'caps-rm', 'caps', 'rm', 'remove existing cap from user', 'succeeds'
(err, out) = rgwadmin(ctx, client, ['caps', 'rm', '--uid', user1, '--caps', caps])
assert not out['caps']
# TESTCASE 'rm-user','user','rm','existing user','fails, still has buckets'
bucket = connection.create_bucket(bucket_name)
rl.log_and_clear("create_bucket", bucket_name, user1)
key = boto.s3.key.Key(bucket)
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1])
assert err
# TESTCASE 'rm-user2', 'user', 'rm', 'user with data', 'succeeds'
bucket = connection.create_bucket(bucket_name)
rl.log_and_clear("create_bucket", bucket_name, user1)
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('twelve')
rl.log_and_clear("put_obj", bucket_name, user1)
time.sleep(35)
# need to wait for all usage data to get flushed, should take up to 30 seconds
timestamp = time.time()
while time.time() - timestamp <= (20 * 60): # wait up to 20 minutes
(err, out) = rgwadmin(ctx, client, ['usage', 'show', '--categories', 'delete_obj']) # last operation we did is delete obj, wait for it to flush
while time.time() - timestamp <= (2 * 60): # wait up to 20 minutes
(err, out) = rgwadmin(ctx, client, ['usage', 'show', '--categories', 'delete_obj']) # one of the operations we did is delete_obj, should be present.
if get_user_successful_ops(out, user1) > 0:
break
time.sleep(1)
@ -443,6 +826,11 @@ def task(ctx, config):
assert len(out['entries']) > 0
assert len(out['summary']) > 0
r = acc.compare_results(out)
if len(r) != 0:
sys.stderr.write(("\n".join(r))+"\n")
assert(len(r) == 0)
user_summary = get_user_summary(out, user1)
total = user_summary['total']
@ -471,6 +859,12 @@ def task(ctx, config):
assert entry['category'] == cat
assert entry['successful_ops'] > 0
# should be all through with connection. (anything using connection
# should be BEFORE the usage stuff above.)
rl.log_and_clear("(before-close)", '-', '-', ignore_this_entry)
connection.close()
connection = None
# the usage flush interval is 30 seconds, wait that much an then some
# to make sure everything has been flushed
time.sleep(35)
@ -483,127 +877,6 @@ def task(ctx, config):
assert len(out['entries']) == 0
assert len(out['summary']) == 0
# TESTCASE 'user-suspend2','user','suspend','existing user','succeeds'
(err, out) = rgwadmin(ctx, client, ['user', 'suspend', '--uid', user1],
check_status=True)
# TESTCASE 'user-suspend3','user','suspend','suspended user','cannot write objects'
try:
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('five')
except boto.exception.S3ResponseError as e:
assert e.status == 403
# TESTCASE 'user-renable2','user','enable','suspended user','succeeds'
(err, out) = rgwadmin(ctx, client, ['user', 'enable', '--uid', user1],
check_status=True)
# TESTCASE 'user-renable3','user','enable','reenabled user','can write objects'
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('six')
# TESTCASE 'gc-list', 'gc', 'list', 'get list of objects ready for garbage collection'
# create an object large enough to be split into multiple parts
test_string = 'foo'*10000000
big_key = boto.s3.key.Key(bucket)
big_key.set_contents_from_string(test_string)
# now delete the head
big_key.delete()
# wait a bit to give the garbage collector time to cycle
time.sleep(15)
(err, out) = rgwadmin(ctx, client, ['gc', 'list'])
assert len(out) > 0
# TESTCASE 'gc-process', 'gc', 'process', 'manually collect garbage'
(err, out) = rgwadmin(ctx, client, ['gc', 'process'], check_status=True)
#confirm
(err, out) = rgwadmin(ctx, client, ['gc', 'list'])
assert len(out) == 0
# TESTCASE 'rm-user-buckets','user','rm','existing user','fails, still has buckets'
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1])
assert err
# delete should fail because ``key`` still exists
try:
bucket.delete()
except boto.exception.S3ResponseError as e:
assert e.status == 409
key.delete()
bucket.delete()
# TESTCASE 'policy', 'bucket', 'policy', 'get bucket policy', 'returns S3 policy'
bucket = connection.create_bucket(bucket_name)
# create an object
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('seven')
# should be private already but guarantee it
key.set_acl('private')
(err, out) = rgwadmin(ctx, client,
['policy', '--bucket', bucket.name, '--object', key.key],
check_status=True, format='xml')
acl = get_acl(key)
assert acl == out.strip('\n')
# add another grantee by making the object public read
key.set_acl('public-read')
(err, out) = rgwadmin(ctx, client,
['policy', '--bucket', bucket.name, '--object', key.key],
check_status=True, format='xml')
acl = get_acl(key)
assert acl == out.strip('\n')
# TESTCASE 'rm-bucket', 'bucket', 'rm', 'bucket with objects', 'succeeds'
bucket = connection.create_bucket(bucket_name)
key_name = ['eight', 'nine', 'ten', 'eleven']
for i in range(4):
key = boto.s3.key.Key(bucket)
key.set_contents_from_string(key_name[i])
(err, out) = rgwadmin(ctx, client,
['bucket', 'rm', '--bucket', bucket_name, '--purge-objects'],
check_status=True)
# TESTCASE 'caps-add', 'caps', 'add', 'add user cap', 'succeeds'
caps='user=read'
(err, out) = rgwadmin(ctx, client, ['caps', 'add', '--uid', user1, '--caps', caps])
assert out['caps'][0]['perm'] == 'read'
# TESTCASE 'caps-rm', 'caps', 'rm', 'remove existing cap from user', 'succeeds'
(err, out) = rgwadmin(ctx, client, ['caps', 'rm', '--uid', user1, '--caps', caps])
assert not out['caps']
# TESTCASE 'rm-user','user','rm','existing user','fails, still has buckets'
bucket = connection.create_bucket(bucket_name)
key = boto.s3.key.Key(bucket)
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--uid', user1])
assert err
# TESTCASE 'rm-user2', 'user', 'rm', 'user with data', 'succeeds'
bucket = connection.create_bucket(bucket_name)
key = boto.s3.key.Key(bucket)
key.set_contents_from_string('twelve')
(err, out) = rgwadmin(ctx, client,
['user', 'rm', '--uid', user1, '--purge-data' ],
check_status=True)
@ -638,3 +911,45 @@ def task(ctx, config):
(err, out) = rgwadmin(ctx, client, ['zone', 'get','--rgw-zone','default'])
assert len(out) > 0
assert len(out['placement_pools']) == orig_placement_pools + 1
zonecmd = ['zone', 'placement', 'rm',
'--rgw-zone', 'default',
'--placement-id', 'new-placement']
(err, out) = rgwadmin(ctx, client, zonecmd, check_status=True)
import sys
from tasks.radosgw_admin import task
from teuthology.config import config
from teuthology.orchestra import cluster, remote
import argparse;
def main():
if len(sys.argv) == 3:
user = sys.argv[1] + "@"
host = sys.argv[2]
elif len(sys.argv) == 2:
user = ""
host = sys.argv[1]
else:
sys.stderr.write("usage: radosgw_admin.py [user] host\n")
exit(1)
client0 = remote.Remote(user + host)
ctx = config
ctx.cluster=cluster.Cluster(remotes=[(client0,
[ 'ceph.client.rgw.%s' % (host), ]),])
ctx.rgw = argparse.Namespace()
endpoints = {}
endpoints['ceph.client.rgw.%s' % host] = (host, 80)
ctx.rgw.role_endpoints = endpoints
ctx.rgw.realm = None
ctx.rgw.regions = {'region0': { 'api name': 'api1',
'is master': True, 'master zone': 'r0z0',
'zones': ['r0z0', 'r0z1'] }}
ctx.rgw.config = {'ceph.client.rgw.%s' % host: {'system user': {'name': '%s-system-user' % host}}}
task(config, None)
exit()
if __name__ == '__main__':
main()

View File

@ -111,11 +111,11 @@ static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
rgw::asio::ClientIO real_client{socket, parser, buffer};
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(
rgw::io::add_buffering(cct,
rgw::io::add_chunking(
rgw::io::add_conlen_controlling(
&real_client))));
RGWRestfulIO client(&real_client_io);
RGWRestfulIO client(cct, &real_client_io);
process_request(env.store, env.rest, &req, env.uri_prefix,
*env.auth_registry, &client, env.olog);

View File

@ -24,11 +24,11 @@ int RGWCivetWebFrontend::process(struct mg_connection* const conn)
RGWCivetWeb cw_client(conn);
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(
rgw::io::add_buffering(dout_context,
rgw::io::add_chunking(
rgw::io::add_conlen_controlling(
&cw_client))));
RGWRestfulIO client_io(&real_client_io);
RGWRestfulIO client_io(dout_context, &real_client_io);
RGWRequest req(env.store->get_new_req_id());
int ret = process_request(env.store, env.rest, &req, env.uri_prefix,

View File

@ -339,8 +339,8 @@ class RGWRestfulIO : public rgw::io::AccountingFilter<rgw::io::RestfulClient*> {
public:
~RGWRestfulIO() override = default;
RGWRestfulIO(rgw::io::RestfulClient* engine)
: AccountingFilter<rgw::io::RestfulClient*>(std::move(engine)) {
RGWRestfulIO(CephContext *_cx, rgw::io::RestfulClient* engine)
: AccountingFilter<rgw::io::RestfulClient*>(_cx, std::move(engine)) {
}
void add_filter(std::shared_ptr<DecoratedRestfulClient> new_filter) {

View File

@ -20,20 +20,24 @@ class AccountingFilter : public DecoratedRestfulClient<T>,
bool enabled;
uint64_t total_sent;
uint64_t total_received;
CephContext *cct;
public:
template <typename U>
AccountingFilter(U&& decoratee)
AccountingFilter(CephContext *cct, U&& decoratee)
: DecoratedRestfulClient<T>(std::forward<U>(decoratee)),
enabled(false),
total_sent(0),
total_received(0) {
total_received(0), cct(cct) {
}
size_t send_status(const int status,
const char* const status_name) override {
const auto sent = DecoratedRestfulClient<T>::send_status(status,
status_name);
lsubdout(cct, rgw, 30) << "AccountingFilter::send_status: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -42,6 +46,9 @@ public:
size_t send_100_continue() override {
const auto sent = DecoratedRestfulClient<T>::send_100_continue();
lsubdout(cct, rgw, 30) << "AccountingFilter::send_100_continue: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -51,6 +58,9 @@ public:
size_t send_header(const boost::string_ref& name,
const boost::string_ref& value) override {
const auto sent = DecoratedRestfulClient<T>::send_header(name, value);
lsubdout(cct, rgw, 30) << "AccountingFilter::send_header: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -59,6 +69,9 @@ public:
size_t send_content_length(const uint64_t len) override {
const auto sent = DecoratedRestfulClient<T>::send_content_length(len);
lsubdout(cct, rgw, 30) << "AccountingFilter::send_content_length: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -67,6 +80,9 @@ public:
size_t send_chunked_transfer_encoding() override {
const auto sent = DecoratedRestfulClient<T>::send_chunked_transfer_encoding();
lsubdout(cct, rgw, 30) << "AccountingFilter::send_chunked_transfer_encoding: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -75,6 +91,9 @@ public:
size_t complete_header() override {
const auto sent = DecoratedRestfulClient<T>::complete_header();
lsubdout(cct, rgw, 30) << "AccountingFilter::complete_header: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -83,6 +102,8 @@ public:
size_t recv_body(char* buf, size_t max) override {
const auto received = DecoratedRestfulClient<T>::recv_body(buf, max);
lsubdout(cct, rgw, 30) << "AccountingFilter::recv_body: e="
<< (enabled ? "1" : "0") << ", received=" << received << dendl;
if (enabled) {
total_received += received;
}
@ -92,6 +113,20 @@ public:
size_t send_body(const char* const buf,
const size_t len) override {
const auto sent = DecoratedRestfulClient<T>::send_body(buf, len);
lsubdout(cct, rgw, 30) << "AccountingFilter::send_body: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
return sent;
}
size_t complete_request() override {
const auto sent = DecoratedRestfulClient<T>::complete_request();
lsubdout(cct, rgw, 30) << "AccountingFilter::complete_request: e="
<< (enabled ? "1" : "0") << ", sent=" << sent << ", total="
<< total_sent << dendl;
if (enabled) {
total_sent += sent;
}
@ -108,6 +143,8 @@ public:
void set_account(bool enabled) override {
this->enabled = enabled;
lsubdout(cct, rgw, 30) << "AccountingFilter::set_account: e="
<< (enabled ? "1" : "0") << dendl;
}
};
@ -122,13 +159,14 @@ protected:
bool has_content_length;
bool buffer_data;
CephContext *cct;
public:
template <typename U>
BufferingFilter(U&& decoratee)
BufferingFilter(CephContext *cct, U&& decoratee)
: DecoratedRestfulClient<T>(std::forward<U>(decoratee)),
has_content_length(false),
buffer_data(false) {
buffer_data(false), cct(cct) {
}
size_t send_content_length(const uint64_t len) override;
@ -144,6 +182,9 @@ size_t BufferingFilter<T>::send_body(const char* const buf,
{
if (buffer_data) {
data.append(buf, len);
lsubdout(cct, rgw, 30) << "BufferingFilter<T>::send_body: defer count = "
<< len << dendl;
return 0;
}
@ -170,6 +211,8 @@ size_t BufferingFilter<T>::complete_header()
if (! has_content_length) {
/* We will dump everything in complete_request(). */
buffer_data = true;
lsubdout(cct, rgw, 30) << "BufferingFilter<T>::complete_header: has_content_length="
<< (has_content_length ? "1" : "0") << dendl;
return 0;
}
@ -182,8 +225,16 @@ size_t BufferingFilter<T>::complete_request()
size_t sent = 0;
if (! has_content_length) {
/* It is not correct to count these bytes here,
* because they can only be part of the header.
* Therefore force count to 0.
*/
sent += DecoratedRestfulClient<T>::send_content_length(data.length());
sent += DecoratedRestfulClient<T>::complete_header();
lsubdout(cct, rgw, 30) <<
"BufferingFilter::complete_request: !has_content_length: IGNORE: sent="
<< sent << dendl;
sent = 0;
}
if (buffer_data) {
@ -195,14 +246,18 @@ size_t BufferingFilter<T>::complete_request()
}
data.clear();
buffer_data = false;
lsubdout(cct, rgw, 30) << "BufferingFilter::complete_request: buffer_data: sent="
<< sent << dendl;
}
return sent + DecoratedRestfulClient<T>::complete_request();
}
template <typename T> static inline
BufferingFilter<T> add_buffering(T&& t) {
return BufferingFilter<T>(std::forward<T>(t));
BufferingFilter<T> add_buffering(
CephContext *cct,
T&& t) {
return BufferingFilter<T>(cct, std::forward<T>(t));
}

View File

@ -118,10 +118,10 @@ void RGWFCGXProcess::handle_request(RGWRequest* r)
RGWFCGX fcgxfe(req->fcgx);
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(
rgw::io::add_buffering(cct,
rgw::io::add_chunking(
&fcgxfe)));
RGWRestfulIO client_io(&real_client_io);
RGWRestfulIO client_io(cct, &real_client_io);
int ret = process_request(store, rest, req, uri_prefix,

View File

@ -131,7 +131,7 @@ void RGWLoadGenProcess::handle_request(RGWRequest* r)
env.sign(access_key);
RGWLoadGenIO real_client_io(&env);
RGWRestfulIO client_io(&real_client_io);
RGWRestfulIO client_io(cct, &real_client_io);
int ret = process_request(store, rest, req, uri_prefix,
*auth_registry, &client_io, olog);

View File

@ -221,6 +221,11 @@ static void log_usage(struct req_state *s, const string& op_name)
if (!s->is_err())
data.successful_ops = 1;
ldout(s->cct, 30) << "log_usage: bucket_name=" << bucket_name
<< " tenant=" << s->bucket_tenant
<< ", bytes_sent=" << bytes_sent << ", bytes_received="
<< bytes_received << ", success=" << data.successful_ops << dendl;
entry.add(op_name, data);
utime_t ts = ceph_clock_now();