mirror of
https://github.com/ceph/ceph
synced 2024-12-26 13:33:57 +00:00
Merge pull request #247 from ceph/requests-sessions
Use a requests.Session object for retries instead of safe_while
This commit is contained in:
commit
f74eea7b9e
@ -9,7 +9,6 @@ from datetime import datetime
|
||||
|
||||
import teuthology
|
||||
from .config import config
|
||||
from .contextutil import safe_while
|
||||
|
||||
|
||||
def init_logging():
|
||||
@ -174,7 +173,7 @@ class ResultsReporter(object):
|
||||
last_run_file = 'last_successful_run'
|
||||
|
||||
def __init__(self, archive_base=None, base_uri=None, save=False,
|
||||
refresh=False, timeout=20, log=None):
|
||||
refresh=False, log=None):
|
||||
self.log = log or init_logging()
|
||||
self.archive_base = archive_base or config.archive_base
|
||||
self.base_uri = base_uri or config.results_server
|
||||
@ -183,7 +182,13 @@ class ResultsReporter(object):
|
||||
self.serializer = ResultsSerializer(archive_base, log=self.log)
|
||||
self.save_last_run = save
|
||||
self.refresh = refresh
|
||||
self.timeout = timeout
|
||||
self.session = self._make_session()
|
||||
|
||||
def _make_session(self, max_retries=10):
|
||||
session = requests.Session()
|
||||
adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
|
||||
session.mount('http://', adapter)
|
||||
return session
|
||||
|
||||
def report_all_runs(self):
|
||||
"""
|
||||
@ -230,7 +235,7 @@ class ResultsReporter(object):
|
||||
))
|
||||
if jobs:
|
||||
if not self.refresh:
|
||||
response = requests.head("{base}/runs/{name}/".format(
|
||||
response = self.session.head("{base}/runs/{name}/".format(
|
||||
base=self.base_uri, name=run_name))
|
||||
if response.status_code == 200:
|
||||
self.log.info(" already present; skipped")
|
||||
@ -269,7 +274,7 @@ class ResultsReporter(object):
|
||||
job_info['status'] = 'dead'
|
||||
job_json = json.dumps(job_info)
|
||||
headers = {'content-type': 'application/json'}
|
||||
response = requests.post(run_uri, data=job_json, headers=headers)
|
||||
response = self.session.post(run_uri, data=job_json, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
return job_id
|
||||
@ -288,7 +293,8 @@ class ResultsReporter(object):
|
||||
|
||||
if msg and msg.endswith('already exists'):
|
||||
job_uri = os.path.join(run_uri, job_id, '')
|
||||
response = requests.put(job_uri, data=job_json, headers=headers)
|
||||
response = self.session.put(job_uri, data=job_json,
|
||||
headers=headers)
|
||||
elif msg:
|
||||
self.log.error(
|
||||
"POST to {uri} failed with status {status}: {msg}".format(
|
||||
@ -338,7 +344,7 @@ class ResultsReporter(object):
|
||||
if not 'job_id' in fields:
|
||||
fields.append('job_id')
|
||||
uri += "?fields=" + ','.join(fields)
|
||||
response = requests.get(uri)
|
||||
response = self.session.get(uri)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
@ -351,7 +357,7 @@ class ResultsReporter(object):
|
||||
"""
|
||||
uri = "{base}/runs/{name}/jobs/{job_id}/".format(
|
||||
base=self.base_uri, name=run_name, job_id=job_id)
|
||||
response = requests.delete(uri)
|
||||
response = self.session.delete(uri)
|
||||
response.raise_for_status()
|
||||
|
||||
def delete_jobs(self, run_name, job_ids):
|
||||
@ -372,7 +378,7 @@ class ResultsReporter(object):
|
||||
"""
|
||||
uri = "{base}/runs/{name}/".format(
|
||||
base=self.base_uri, name=run_name)
|
||||
response = requests.delete(uri)
|
||||
response = self.session.delete(uri)
|
||||
response.raise_for_status()
|
||||
|
||||
|
||||
@ -420,15 +426,13 @@ def try_push_job_info(job_config, extra_info=None):
|
||||
else:
|
||||
job_info = job_config
|
||||
|
||||
with safe_while(_raise=False) as proceed:
|
||||
while proceed():
|
||||
try:
|
||||
log.debug("Pushing job info to %s", config.results_server)
|
||||
push_job_info(run_name, job_id, job_info)
|
||||
return
|
||||
except (requests.exceptions.RequestException, socket.error):
|
||||
log.exception("Could not report results to %s",
|
||||
config.results_server)
|
||||
try:
|
||||
log.debug("Pushing job info to %s", config.results_server)
|
||||
push_job_info(run_name, job_id, job_info)
|
||||
return
|
||||
except (requests.exceptions.RequestException, socket.error):
|
||||
log.exception("Could not report results to %s",
|
||||
config.results_server)
|
||||
|
||||
|
||||
def try_delete_jobs(run_name, job_ids, delete_empty_run=True):
|
||||
@ -461,22 +465,18 @@ def try_delete_jobs(run_name, job_ids, delete_empty_run=True):
|
||||
got_jobs = reporter.get_jobs(run_name, fields=['job_id'])
|
||||
got_job_ids = [j['job_id'] for j in got_jobs]
|
||||
if sorted(got_job_ids) == sorted(job_ids):
|
||||
with safe_while(_raise=False) as proceed:
|
||||
while proceed():
|
||||
try:
|
||||
reporter.delete_run(run_name)
|
||||
return
|
||||
except (requests.exceptions.RequestException, socket.error): # noqa
|
||||
log.exception("Run deletion failed")
|
||||
try:
|
||||
reporter.delete_run(run_name)
|
||||
return
|
||||
except (requests.exceptions.RequestException, socket.error):
|
||||
log.exception("Run deletion failed")
|
||||
|
||||
def try_delete_job(job_id):
|
||||
with safe_while(_raise=False) as proceed:
|
||||
while proceed():
|
||||
try:
|
||||
reporter.delete_job(run_name, job_id)
|
||||
return
|
||||
except (requests.exceptions.RequestException, socket.error):
|
||||
log.exception("Job deletion failed")
|
||||
try:
|
||||
reporter.delete_job(run_name, job_id)
|
||||
return
|
||||
except (requests.exceptions.RequestException, socket.error):
|
||||
log.exception("Job deletion failed")
|
||||
|
||||
for job_id in job_ids:
|
||||
try_delete_job(job_id)
|
||||
|
Loading…
Reference in New Issue
Block a user