Constrained Thread Environment for Job Orchestration
Overview
Have you ever struggled on limiting the threads utilization within your overall application?
Recently I was asked to integrate a component to an existing application which needs a lot of care on its resource utilization(Figure 1). However, that component naturally has a high tendency of spawning lots of threads within itself to fulfill various functionalities within it. Therefore the component should be built in a way which can certify about its constraints on thread utilization or simply the total number of threads being utilized.
Setup of the Component
As shown in figure 1 the component consists of two major layers. One is the layer which handles all communication with the layer beneath and the other consists of various jobs which utilizes system resources to do their own parts. However, no one can predict how many such jobs in the future. Therefore no one can make predictions about how many threads will be used in future within the component.
Consider the following communication model between a Monitoring Job and Main System(Figure 2). Monitoring Job continuously checks for some matrices within a system. The Main System is capable of getting updated about recent information from Monitoring System via the following two ways.
On-demand- Whenever Main System needs to get the latest information from the Monitoring Job, it calls the corresponding interface(s) and gets the information in a synchronous manner.
Asynchronous updates- Monitoring Job itself can update Main System with a predefined frequency.
Solution
What if the upper layer which manages the communication is capable of constraining the number of threads which are being used by the jobs?.
Rest of the article will discuss how I have achieved this target.
Main design goals
- The number of threads being utilized by jobs should be a constant at any given time.
- Which jobs are being executed, what is the frequency that they submit information to the upper layer and which duration that each job lasts should be varying with the time. Simply, we should be able to change the job orchestration plan at any time we need to.
- Failure of one job can’t hinder other jobs at all.
Proposed Approach
Pub-sub Queue to manage job orchestration
A template which specifies what jobs should be executed together with their frequency and duration should be published accordingly as tasks to a queue data structure (Figure 3).
Thread Pool to constrain the number of threads being executed
In order to limit the number of threads being spawned, a thread pool can be used. It is not a hard thing to find threads pools in any language implementation (e.g Python futures thread pool implementation). So this thread pool consumes the queue and assigns threads for jobs based on the availability.
Following diagram shows the complete solution for the problem which is being discussed.
Proof of Concept Using Python Futures
import concurrent.futures
import time
import queue
q = queue.Queue()
def job_A(arg):
print(arg)
def job_B(arg):
print(arg)
def job_C(arg):
print(arg)
def job_D(arg):
raise Exception('Job B has ERROR')
print(arg)
dispatch = {
'ONE_TIME_JOB': job_A,
'TEMPORARY_JOB': job_B,
'INFINITE_JOB': job_C,
'ERRONEOUS_JOB': job_D,
}
schedule_template = {
'ONE_TIME_JOB': {
'duration': None,
'frequency': None,
'arguments': 'One Time Job'
},
'TEMPORARY_JOB': {
'duration': 15,
'frequency': 3,
'arguments': 'Temporary Job'
},
'INFINITE_JOB': {
'duration': None,
'frequency': 5,
'arguments': 'Infinite Job'
},
'ERRONEOUS_JOB': {
'duration': None,
'frequency': 4,
'arguments': 'Erroneous Job'
}
}
def feed_the_workers(spacing):
for key, template in schedule_template.iteritems():
time.sleep(spacing)
q.put({
'key': key,
'template': template
})
return "DONE TEMPLATE FEEDING"
def doctor_func(command, arg):
return dispatch[command](arg)
def do_work(element):
template = element.get('template')
frequency = template.get('frequency')
duration = template.get('duration')
job_key = element.get('key')
arguments = template.get('arguments')
if frequency != None and duration != None:
start_time = time.time()
while (time.time() - start_time) <= duration:
doctor_func(job_key, arguments)
time.sleep(frequency)
elif frequency != None:
while True:
doctor_func(job_key, arguments)
time.sleep(frequency)
else:
doctor_func(job_key, arguments)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_result = {
executor.submit(feed_the_workers, 0.25): 'SCHEDULE TEMPLATE FED'}
while future_to_result:
done, not_done = concurrent.futures.wait(
future_to_result, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)
while not q.empty():
item = q.get()
future_to_result[executor.submit(do_work, item)] = item
for future in done:
item = future_to_result[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (item, exc))
else:
if item == 'SCHEDULE TEMPLATE FED':
print(data)
else:
print('No data')
del future_to_result[future]