ceph/qa/workunits/rados/test_large_omap_detection.py

135 lines
4.4 KiB
Python
Executable File

#!/usr/bin/python3
# -*- mode:python -*-
# vim: ts=4 sw=4 smarttab expandtab
#
# Copyright (C) 2017 Red Hat <contact@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Library Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library Public License for more details.
#
import json
import rados
import shlex
import subprocess
import time
def cleanup(cluster):
cluster.delete_pool('large-omap-test-pool')
cluster.shutdown()
def init():
# For local testing
#cluster = rados.Rados(conffile='./ceph.conf')
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
print("\nCluster ID: " + cluster.get_fsid())
cluster.create_pool('large-omap-test-pool')
ioctx = cluster.open_ioctx('large-omap-test-pool')
ioctx.write_full('large-omap-test-object1', b"Lorem ipsum")
op = ioctx.create_write_op()
keys = []
values = []
for x in range(20001):
keys.append(str(x))
values.append(b"X")
ioctx.set_omap(op, tuple(keys), tuple(values))
ioctx.operate_write_op(op, 'large-omap-test-object1', 0)
ioctx.release_write_op(op)
ioctx.write_full('large-omap-test-object2', b"Lorem ipsum dolor")
op = ioctx.create_write_op()
buffer = ("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut "
"enim ad minim veniam, quis nostrud exercitation ullamco laboris "
"nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
"reprehenderit in voluptate velit esse cillum dolore eu fugiat "
"nulla pariatur. Excepteur sint occaecat cupidatat non proident, "
"sunt in culpa qui officia deserunt mollit anim id est laborum.")
keys = []
values = []
for x in range(20000):
keys.append(str(x))
values.append(buffer.encode())
ioctx.set_omap(op, tuple(keys), tuple(values))
ioctx.operate_write_op(op, 'large-omap-test-object2', 0)
ioctx.release_write_op(op)
ioctx.close()
return cluster
def get_deep_scrub_timestamp(pgid):
cmd = ['ceph', 'pg', 'dump', '--format=json-pretty']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
out = proc.communicate()[0]
try:
pgstats = json.loads(out)['pg_map']['pg_stats']
except KeyError:
pgstats = json.loads(out)['pg_stats']
for stat in pgstats:
if stat['pgid'] == pgid:
return stat['last_deep_scrub_stamp']
def wait_for_scrub():
osds = set();
pgs = dict();
cmd = ['ceph', 'osd', 'map', 'large-omap-test-pool',
'large-omap-test-object1', '--format=json-pretty']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
out = proc.communicate()[0]
osds.add(json.loads(out)['acting_primary'])
pgs[json.loads(out)['pgid']] = get_deep_scrub_timestamp(json.loads(out)['pgid'])
cmd = ['ceph', 'osd', 'map', 'large-omap-test-pool',
'large-omap-test-object2', '--format=json-pretty']
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
out = proc.communicate()[0]
osds.add(json.loads(out)['acting_primary'])
pgs[json.loads(out)['pgid']] = get_deep_scrub_timestamp(json.loads(out)['pgid'])
for pg in pgs:
command = "ceph pg deep-scrub " + str(pg)
subprocess.check_call(shlex.split(command))
for pg in pgs:
RETRIES = 0
while RETRIES < 60 and pgs[pg] == get_deep_scrub_timestamp(pg):
time.sleep(10)
RETRIES += 1
def check_health_output():
RETRIES = 0
result = 0
while RETRIES < 6 and result != 2:
result = 0
RETRIES += 1
output = subprocess.check_output(["ceph", "health", "detail"])
for line in output.splitlines():
result += int(line.find(b'2 large omap objects') != -1)
time.sleep(10)
if result != 2:
print("Error, got invalid output:")
print(output)
raise Exception
def main():
cluster = init()
wait_for_scrub()
check_health_output()
cleanup(cluster)
if __name__ == '__main__':
main()