Saturday, 14 September 2013

Multiprocessing and Pyramid: How do I make sure subprocesses actually end?

Multiprocessing and Pyramid: How do I make sure subprocesses actually end?

I'm trying to have a pool of worker processes in my Pyramid application
that can be used to carry out CPU-intensive (or long-running) background
tasks that I don't want to bog down the views with. What I have right now
works, but there is one problem: If waitress exits from termination (like
it happens with --reload) the workers keep lingering, and I don't know how
to signal them to stop.
import multiprocessing as mp
from queue import Empty as QueueEmptyError
class MyPool(object):
def __init__(self, processes=10, queue=None):
self.jobqueue = queue if queue is not None else mp.Queue()
self.procs = []
self.running = True
for i in range(processes):
worker = mp.Process(target=self._worker, daemon=True)
self.procs.append(worker)
worker.start()
def __del__(self):
self.stopall()
def stopall(self):
self.running = False
for worker in self.procs:
worker.join()
def _worker(self):
while self.running:
try:
self._dojob(self.jobqueue.get(True, 1))
except QueueEmptyError:
pass
def _dojob(self, taskdata):
print(str(taskdata) + ' is happening')
class PoolMaster(object):
def __init__(self):
self.pools = []
self.aqueue = mp.Queue()
self.apool = MyPool(6, self.aqueue)
self.pools.append(self.apool)
def __del__(self):
for pool in self.pools:
pool.stopall()
def do_something(self):
self.aqueue.put_nowait('Something')
PoolMaster is instantiated once in my project's main() function and
exposed to all views by adding it to all events.
What I tried before was adding a "poison pill" to the queue when __del__
happens, but as it turns out __del__ doesn't seem to get called at all. I
don't want to use multiprocessing's own Pool because they seem to be made
for running through a set workload once, not constantly working on a
queue. So, how do I stop them from running after the actual application
has exited?

No comments:

Post a Comment