Distributed Worker & Scheduler

MorpFW integrates with Celery to provide support for running asynchronous & scheduled jobs. The morpfw command line tool provides subcomands which makes it easy to start celery worker and scheduler for your project.

In asynchronous tasks triggered through web development API, MorpFW encodes the WSGI request object and pass it to the worker so that you will get a very similar behavior to web development when working with asynchronous tasks.

In scheduled task, a minimal request object is created and passed to the scheduled task function.

Creating Async Task

Asynchronous task is implemented as signals which you can implement a subscriber to the signal.

To create an async task subscribing to a signal, you can use the async_subscribe decorator on your App object. . The task can then be triggered using request.async_dispatch.

Warning

Because request object is passed to the worker, avoid using this in pages with uploads as it involves transfering the upload to the worker.

Following is a simple example implementation

import time

import morpfw
import pytest

# lets setup a skeleton app


class App(morpfw.BaseApp):
    pass


class Root(object):
    pass


@App.path(model=Root, path="")
def get_root(request):
    return Root()


# this view will dispatch the event


@App.json(model=Root)
def index(context, request):
    subs = request.async_dispatch("test_signal", obj={"data": 10})
    res = []
    for s in subs:
        res.append(s.get())
    return res


# these are the async handlers


@App.async_subscribe("test_signal")
def handler1(request_options, obj):
    # request_options contain parameters for instantiating a request
    with morpfw.request_factory(**request_options) as request:
        obj["handler"] = "handler1"
        obj["data"] += 1
        return obj


@App.async_subscribe("test_signal")
def handler2(request_options, obj):
    with morpfw.request_factory(**request_options) as request:
        obj["handler"] = "handler2"
        obj["data"] += 5
        return obj

Creating Scheduled Job

Scheduled job can be implemented with a similar API style. MorpFW exposes both the cron scheduler and periodic scheduler of Celery in an easy to use API.

Following is a simple example implementation

import time

import morpfw
import pytest

# lets setup a skeleton app

class App(morpfw.BaseApp):
    pass


class Root(object):
    pass


@App.path(model=Root, path='')
def get_root(request):
    return Root()


# lets hook up some scheduled job

# run this code every 5 seconds
@App.periodic(name='myproject.every-5-seconds', seconds=5)
def run_5_secs(request_options):
    print('periodic tick!')


# run this code every 1 minute, using cron style scheduling
@App.cron(name='myproject.minutely', minute='*')
def run_every_1_minute(request_options):
    print('cron tick!')

Starting Celery Worker & Celery Beat Scheduler

Worker and beat scheduler can be easily started up using:

$ # start worker
$ morpfw -s settings.yml worker

$ # start scheduler
$ morpfw -s settings.yml scheduler