Distributed Worker & Scheduler¶
MorpFW integrates with Celery to provide support for running asynchronous
& scheduled jobs. The morpfw
command line tool provides subcomands which
makes it easy to start celery worker and scheduler for your project.
In asynchronous tasks triggered through web development API, MorpFW encodes the WSGI request object and pass it to the worker so that you will get a very similar behavior to web development when working with asynchronous tasks.
In scheduled task, a minimal request object is created and passed to the scheduled task function.
Creating Async Task¶
Asynchronous task is implemented as signals which you can implement a subscriber to the signal.
To create an async task subscribing to a signal, you can use the
async_subscribe
decorator on your App
object. . The task can then be
triggered using request.async_dispatch
.
Warning
Because request object is passed to the worker, avoid using this in pages with uploads as it involves transfering the upload to the worker.
Following is a simple example implementation
import time
import morpfw
import pytest
# lets setup a skeleton app
class App(morpfw.BaseApp):
pass
class Root(object):
pass
@App.path(model=Root, path="")
def get_root(request):
return Root()
# this view will dispatch the event
@App.json(model=Root)
def index(context, request):
subs = request.async_dispatch("test_signal", obj={"data": 10})
res = []
for s in subs:
res.append(s.get())
return res
# these are the async handlers
@App.async_subscribe("test_signal")
def handler1(request_options, obj):
# request_options contain parameters for instantiating a request
with morpfw.request_factory(**request_options) as request:
obj["handler"] = "handler1"
obj["data"] += 1
return obj
@App.async_subscribe("test_signal")
def handler2(request_options, obj):
with morpfw.request_factory(**request_options) as request:
obj["handler"] = "handler2"
obj["data"] += 5
return obj
Creating Scheduled Job¶
Scheduled job can be implemented with a similar API style. MorpFW exposes both the cron scheduler and periodic scheduler of Celery in an easy to use API.
Following is a simple example implementation
import time
import morpfw
import pytest
# lets setup a skeleton app
class App(morpfw.BaseApp):
pass
class Root(object):
pass
@App.path(model=Root, path='')
def get_root(request):
return Root()
# lets hook up some scheduled job
# run this code every 5 seconds
@App.periodic(name='myproject.every-5-seconds', seconds=5)
def run_5_secs(request_options):
print('periodic tick!')
# run this code every 1 minute, using cron style scheduling
@App.cron(name='myproject.minutely', minute='*')
def run_every_1_minute(request_options):
print('cron tick!')
Starting Celery Worker & Celery Beat Scheduler¶
Worker and beat scheduler can be easily started up using:
$ # start worker
$ morpfw -s settings.yml worker
$ # start scheduler
$ morpfw -s settings.yml scheduler