Lately, I had a task of creating a workflow manager and after much testing and feature evaluation of products such as Luigi, TaskFlow, Azkaban, etc. I settled on doing it with plain celery. I chose to use canvas functionality offered by celery and was floored by what I found. It was easy to create workflows using chain and group feature of celery and it was also very performant system. The configuration for system that was finalized had rabbitmq as broker and mongodb as result backend. Since the use case was to have completely asynchronous flows, there was hardly any need for result backend except for celery’s own use when handing subtask level errors. Since what we were building itself was going to partly do what celery does, the result backend using Mongodb seemed like a good choice. It enabled us to perform queries and aggregate results, with its powerful aggregation apis. However, there were two problems which needed addressing. Although, they are not a blocker but they really made me think of why celery wouldn’t be a very good choice in certain backend scenarios. It’s greatest strength of being very simple to use was also its greatest weakness, since it allowed us to use it badly. I didn’t get much time to spend in the internals of the source code of celery but it was a bit difficult to extend as well as to tweak it to whatever specification one may wish when implementing backend asynchronous systems.

To understand that let’s look at the example provided by celery to see what is it that I’m getting at. Following is the simplest getting started code from the celery’s quickstart page.

from celery import Celery
app = Celery(‘tasks’, broker=‘pyamqp://[email protected]//’)

@app.task
def add(x, y):
    return x + y

F = add.delay(10, 20) # Returns asynchronous future object to be used later
print(F.get()) # Block for the result, requires result backend to be configured

As can be seen from above, getting celery to make your tasks asynchronous is very easy. You can start a worker and call the last two lines from your code base, which will run add method on the worker. This requires that your code base must be shared between the calling code location and the location from where worker is run. It can simply be accomplished by running the same code base in separate locations as either master (to publish the task) and worker (to consume and execute those tasks). However, what about the situation where the master may not be part of the same code base. What if the call to add.delay() is being made from entirely different system, which does not have the implementation of add to use as celery task. This can be overcome by defining the method with the master, as shown below, and have the real implementation shown above with worker alone.

@app.task
def add(x, y):
    pass

This works for a small project but what if the calls are supposed to behave like RPC calls and stubs need not be present at every location of master or the worker. In that case, the management of the tasks becomes a bit tricky and we must resort to git submodules to make everything available to everyone.

Another problem which was encountered is the fact that all tasks that needs to run as celery task must be decorated as shown in the example. We cannot make any method a celery task arbitrarily. Nor is it easy to use Object Orientation to create tasks without having to extend the celery’s Task object. If using celery is finalized and there is not turning back ever then it seems fine. But what about the cases where you may have to switch to some other task executor in the system (concurrent.futures?). It will cause the code base to change, since the separation of concerns and decoration are all done in a fixed manner on the code level. What I would really like is to have POPO (plain old python objects) define the tasks with some fixed method (duck typing) for making it a runnable task and send it to the executor without having to worry about which class it extends and whether it is decorated in a certain way. It does not stop there. If I must use celery then there must be broker to go with the system. What if I don’t need a broker? What if I need to execute the code in the same process asynchronously? How to leverage the fact that python 3.5 onwards we have a very great feature called asyncio? Can I choose or mix between local async execution versus remote execution using multiprocessing queue or third party messaging queue? How about zeromq instead of rabbitMQ? What about the design patterns involved? Should I be able to create a command and send it to executors which will raise certain events to be handled by creating another set of commands ran by the same executor, regardless of whether the executor is local or remote?

Like I mentioned earlier, I’m still learning celery so there is a possibility that most of the questions I have raised in previous paragraphs are already taken care of. But I would like to think of a different way, which would not depend on a library/framework that locks me to it. Which is why, I believe that the solution must leverage existing stand libraries like asyncio, multiprocessing, collections, etc and it must use design patterns such as command and observer besides, abstract factory and builder to accomplish the goal. This will allow the solution to be extensible and yet abstract enough to not change. Although, I do not have the answer to it yet, I have started a open source project to accomplish just that, which can be accessed here. Following is the class diagram for the way the system is being envisioned. I will follow up with the development and update on the blog as it proceeds.

As the system is envisioned it must have inline executor to execute the commands in the same process or future executor that can further be of two types; multiprocessing based or external queue based (preferably zmq since it does not involve a need for a broker). The command executor will raise events with listener being based on coroutine or on a separate process listening on a queue, either of which will initiate the event handler by creating the command and sending it back to the command executor. The command registry is where all the tasks will be registered with a key and executors will look up the executable part of the command from the registry. Since this registry will contain commands, which must be callable with “(*args, **kwargs)” arguments, in it to be registered. This way, using the duck typing afforted by python, we can add any object to the registry, giving us the freedom to register the command callables from anywhere in the system.

Of course, there is a room for improvement in this design, but it will also be capable for running flows where the commands can be composite and to be run in parallel or sequentially, with each of those commands being a simple command or a composite one. The behaviour of such system solely depends on the standard modules without the need for any third party apis and with the ability to use any callable in the system as a command, it opens up the possibility to not get locked in to any particular framework during the actual implementation.

Celery is a cool piece a software and I have already fallen in love with it. Therefore, it makes sense that I should be able to leverage that too, if I need to use it. As can be seen, the proposed design is extensible and celery itself can be added as a remote executor and be used with this system. However, the catch is that it reduces the celery to being a simple executor and gives up a lot of it’s great features such as canvas and signals, etc. by making them redundant. As I mentioned earlier, this is a proposed design, implementation is underway and if it makes sense then it is prone to change. Perhaps, I will end up making a celery clone after realizing that it was the best thing for this sort of work and that will kind of be like going nowhere, as far as this project is concerned. But then, I will have learned something deeper about celery and I don’t think that would be bad result.

Until next time.