ModelQueue API Reference

ModelQueue is an Apache2 licensed task queue based on Django models.

The examples below assume the following in appname/models.py:

import modelqueue
from django.db import models

class Task(models.Model):
    data = models.TextField()
    status = modelqueue.StatusField(
        db_index=True,
        # ^-- Index for faster queries.
        default=modelqueue.Status.waiting,
        # ^-- Waiting state is ready to run.
    )

Functions

modelqueue.run(queryset, field, action, retry=3, timeout=datetime.timedelta(seconds=3600), delay=datetime.timedelta(0))

Run action on results from queryset in queue defined by field.

For example in appname/management/commands/process_tasks.py:

import modelqueue, time
from django.core.management.base import BaseCommand
from .models import Task

class Command(BaseCommand):

    def handle(self, *args, **options):
        while True:
            task = modelqueue.run(
                Task.objects.all(),
                # ^-- Queryset of models to process.
                'status',
                # ^-- Field name for model queue.
                self.process,
                # ^-- Callable to process model.
            )
            if task is None:
                time.sleep(1)
                # ^-- Bring your own parallelism/concurrency.

    def process(self, report):
        pass  # Process task models.
Parameters
  • queryset – Django queryset

  • field (str) – field name

  • action (function) – applied to models from queryset

  • retry (int) – max retry count (limit 8)

  • timeout (timedelta) – max runtime for action

  • delay (timedelta) – delay time after retry

Returns

model from queryset or None if no waiting model

modelqueue.admin_list_filter(field)

Return Django admin list filter for field describing model queue.

For example in appname/admin.py:

class TaskAdmin(admin.Modeldmin):
    list_filter = [
        modelqueue.admin_list_filter('status'),
        # ^-- Filter tasks in admin by queue state.
    ]
Parameters

field (str) – field name

Returns

Django admin model queue list filter

modelqueue.now()

Return now datetime in UTC timezone.

Exceptions

class modelqueue.Retry(delay=None)

Retry processing the task.

When raised in an action callable from modelqueue.run(), the exception is used to signal that the task should be retried.

Retry does not increment the attempt count of the task.

Optional delay parameter can be used to override the delay given in modelqueue.run().

class modelqueue.Abort(delay=None)

Abort processing the task.

When raised in an action callable from modelqueue.run(), the exception is used to signal that the task was aborted.

Abort does increment the attempt count of the task. If the attempts limit is reached then the task will be canceled.

Optional delay parameter can be used to override the delay given in modelqueue.run().

class modelqueue.Cancel

Cancel processing the task.

When raised in an action callable from modelqueue.run(), the exception is used to signal that the task should be canceled.

Cancel both increments the attempt count and changes the task state to canceled.

State

class modelqueue.State(value, name)

Model Queue State

>>> state = State(1, 'created')
>>> assert state == 1
>>> assert state.name == 'created'
>>> print(state)
created
>>> repr(state)
"State(1, 'created')"
>>> State.created
State(1, 'created')
created = State(1, 'created')
waiting = State(2, 'waiting')
working = State(3, 'working')
finished = State(4, 'finished')
canceled = State(5, 'canceled')

Status

class modelqueue.Status

Model Queue Status

64-bit Signed Integer Field Format:

state        priority          attempt
  |   |----------------------|   |
  2   2018 03 27  14 43 25 759   0
       |   |   |   |  |  |  |
       |   |   | hour |  |  |
      year |   |  minute |  |
         month |     second |
              day       millisecond
>>> status = Status(3201801020304567896)
>>> assert status.state == State.working
>>> assert status.priority == 20180102030456789
>>> assert status.attempts == 6
>>> Status.canceled(123, 5)
Status(5000000000000001235)
states = [State(1, 'created'), State(2, 'waiting'), State(3, 'working'), State(4, 'finished'), State(5, 'canceled')]
property attempts

Return attempts of status.

>>> status = Status(3201801020304567896)
>>> status.attempts
6
classmethod canceled(priority=None, attempts=0)

Return new canceled status with given priority and attempts.

When priority is None (the default), uses now().

Parameters
  • priority – integer or datetime, lower is higher priority

  • attempts (int) – number of previous attempts (0 through 9)

Returns

status

classmethod combine(state, priority, attempts)

Combine state, priority, and attempts fields into status.

>>> Status.combine(State.waiting, 0, 1)
Status(2000000000000000001)
>>> Status.combine('waiting', 0, 2)
Status(2000000000000000002)
>>> priority = dt.datetime(2018, 1, 2, 3, 4, 56, 789123)
>>> Status.combine(State.waiting, priority, 3)
Status(2201801020304567893)
classmethod created(priority=None, attempts=0)

Return new created status with given priority and attempts.

When priority is None (the default), uses now().

Parameters
  • priority – integer or datetime, lower is higher priority

  • attempts (int) – number of previous attempts (0 through 9)

Returns

status

classmethod filter(field, state)

Return keyword arguments to filter field by state for querysets.

For example:

Task.objects.filter(**Status.filter('fieldname', State.working))
Parameters
  • field (str) – field name

  • state (State) – model queue state

Returns

keyword arguments

>>> kwargs = Status.filter('status', State.waiting)
>>> assert len(kwargs) == 2
>>> assert kwargs['status__gte'] == Status.minimum(State.waiting)
>>> assert kwargs['status__lte'] == Status.maximum(State.waiting)
>>> value = Status.filter('status', 'waiting')
>>> assert kwargs == value
classmethod finished(priority=None, attempts=0)

Return new finished status with given priority and attempts.

When priority is None (the default), uses now().

Parameters
  • priority – integer or datetime, lower is higher priority

  • attempts (int) – number of previous attempts (0 through 9)

Returns

status

classmethod maximum(state)

Calculate status maximum value given state.

>>> Status.maximum(State.working)
3999999999999999999
>>> Status.maximum('working')
3999999999999999999
Parameters

state – status state

Returns

maximum status value with given state

classmethod minimum(state)

Calculate status minimum value given state.

>>> Status.minimum(State.working)
3000000000000000000
>>> Status.minimum('working')
3000000000000000000
Parameters

state – status state

Returns

minimum status value with given state

parse(datetime=True)

Parse status into state, priority, and attempts fields.

If datetime is True (the default), then parse priority into datetime object. Priority has format:

       priority
|----------------------|
2018 03 27  14 43 25 759
 |   |   |   |  |  |  |
 |   |   | hour |  |  |
year |   |  minute |  |
   month |     second |
        day       millisecond
Parameters

datetime (bool) – parse priority as datetime (default True)

Returns

tuple of state, priority, and attempts

>>> status = Status(1201801020304567895)
>>> state, priority, attempts = status.parse()
>>> state
State(1, 'created')
>>> priority
datetime.datetime(2018, 1, 2, 3, 4, 56, 789000, tzinfo=<UTC>)
>>> attempts
5
>>> status = Status(3000000000000000007)
>>> status.parse(datetime=False)
(State(3, 'working'), 0, 7)
property priority

Return priority of status.

>>> status = Status(3201801020304567896)
>>> status.priority
20180102030456789
property state

Return state of status.

>>> status = Status(3201801020304567896)
>>> status.state
State(3, 'working')
classmethod tally(queryset, field)

Return mapping of state to count from field in queryset.

For example:

Status.tally(Task.objects.all(), 'status')
Parameters
  • queryset – Django queryset

  • field (str) – field name

Returns

mapping of state names to counts

classmethod waiting(priority=None, attempts=0)

Return new waiting status with given priority and attempts.

When priority is None (the default), uses now().

Parameters
  • priority – integer or datetime, lower is higher priority

  • attempts (int) – number of previous attempts (0 through 9)

Returns

status

classmethod working(priority=None, attempts=0)

Return new working status with given priority and attempts.

When priority is None (the default), uses now().

Parameters
  • priority – integer or datetime, lower is higher priority

  • attempts (int) – number of previous attempts (0 through 9)

Returns

status

StatusField

modelqueue.StatusField

alias of django.db.models.fields.BigIntegerField

Constants

modelqueue.ONE_HOUR
modelqueue.ZERO_SECS