Merge pull request #59847 from rishabh-d-dave/mgr-vol-spawn-threads

mgr/vol: reuse code to spawn threads

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
Rishabh Dave 2024-10-25 11:58:38 +05:30 committed by GitHub
commit eb6dbeec13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -142,11 +142,31 @@ class AsyncJobs(threading.Thread):
self.wakeup_timeout = None
self.threads = []
for i in range(self.nr_concurrent_jobs):
self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
self.threads[-1].start()
self.spawn_all_threads()
self.start()
def spawn_new_thread(self, suffix):
t_name = f'{self.name_pfx}.{time.time()}.{suffix}'
log.debug(f'spawning new thread with name {t_name}')
t = JobThread(self, self.vc, name=t_name)
t.start()
self.threads.append(t)
def spawn_all_threads(self):
log.debug(f'spawning {self.nr_concurrent_jobs} to execute more jobs '
'concurrently')
for i in range(self.nr_concurrent_jobs):
self.spawn_new_thread(i)
def spawn_more_threads(self):
c = len(self.threads)
diff = self.nr_concurrent_jobs - c
log.debug(f'spawning {diff} threads to execute more jobs concurrently')
for i in range(c, self.nr_concurrent_jobs):
self.spawn_new_thread(i)
def set_wakeup_timeout(self):
with self.lock:
# not made configurable on purpose
@ -169,10 +189,7 @@ class AsyncJobs(threading.Thread):
self.cv.notifyAll()
elif c < self.nr_concurrent_jobs:
# Increase concurrency: create more threads.
log.debug("creating new threads to job increase")
for i in range(c, self.nr_concurrent_jobs):
self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
self.threads[-1].start()
self.spawn_more_threads()
self.cv.wait(timeout=self.wakeup_timeout)
def shutdown(self):