Today I Learnt FM
On , I learnt ...
How to use process pools in Django management commands

Sometimes you have to run things concurrently, and a few of those times you have to control the exact concurrency level. In a non-distributed system, controlling concurrency is easy.

Python has the multiprocessing library, and a whole collection of future executors, that can be used to do just that, but there are a few caveats with process-based concurrency:

  • each Django process has a lot of global state that needs initializing
  • message passing between processes can only happen if everything passed can be pickled

I found that this snippet of code works well:


import concurrent.futures

import django
from django.core.management.base import BaseCommand

from main.models import Thing

class Command(BaseCommand):
    @staticmethod
    def run_task(thing_id):
        thing = Thing.objects.get(id=thing_id)
        print(f"Running thing {thing}")

        # do your computation on thing here

        print(f"{thing} done")

    def handle(self, *args, **options):
        concurrency_level = 2
        things = Thing.objects.all()

        print("starting")
        with concurrent.futures.ProcessPoolExecutor(max_workers=concurrency_level, initializer=django.setup) as executor:
            futures = [executor.submit(self.run_task, thing.id) for thing in things]

        try:
            concurrent.futures.wait(futures, timeout=60*60)
        except concurrent.futures.TimeoutError:
            print("Timeout")

        for future in futures:
            if future.done():
                try:
                    future.result()
                except Exception:
                    print("An error occurred in a task")
            else:
                future.cancel()