ModelQueue API Reference

ModelQueue is an Apache2 licensed task queue based on Django models.

The examples below assume the following in appname/models.py:

import modelqueue
from django.db import models

class Task(models.Model):
    data = models.TextField()
    status = modelqueue.StatusField(
        db_index=True,
        # ^-- Index for faster queries.
        default=modelqueue.Status.waiting,
        # ^-- Waiting state is ready to run.
    )

Functions

modelqueue.run(queryset, field, action, retry=3, timeout=datetime.timedelta(0, 3600), delay=datetime.timedelta(0))

Run action on results from queryset in queue defined by field.

For example in appname/management/commands/process_tasks.py:

import modelqueue, time
from django.core.management.base import BaseCommand
from .models import Task

class Command(BaseCommand):

    def handle(self, *args, **options):
        while True:
            task = modelqueue.run(
                Task.objects.all(),
                # ^-- Queryset of models to process.
                'status',
                # ^-- Field name for model queue.
                self.process,
                # ^-- Callable to process model.
            )
            if task is None:
                time.sleep(0.1)
                # ^-- Bring your own parallelism/concurrency.

    def process(self, report):
        pass  # Process task models.
Parameters:
  • queryset – Django queryset
  • field (str) – field name
  • action (function) – applied to models from queryset
  • retry (int) – max retry count (limit 8)
  • timeout (timedelta) – max runtime for action
  • delay (timedelta) – delay time after retry
Returns:

model from queryset or None if no waiting model

modelqueue.admin_list_filter(field)

Return Django admin list filter for field describing model queue.

For example in appname/admin.py:

class TaskAdmin(admin.TaskAdmin):
    list_filter = [
        modelqueue.admin_list_filter('status'),
        # ^-- Filter tasks in admin by queue state.
    ]
Parameters:field (str) – field name
Returns:Django admin model queue list filter
modelqueue.now()

Return now datetime in UTC timezone.

State

class modelqueue.State

Model Queue State

>>> state = State(1, 'created')
>>> assert state == 1
>>> assert state.name == 'created'
>>> print(state)
created
>>> repr(state)
"State(1, 'created')"
>>> State.created
State(1, 'created')
created = State(1, 'created')
waiting = State(2, 'waiting')
working = State(3, 'working')
finished = State(4, 'finished')
canceled = State(5, 'canceled')

Status

class modelqueue.Status

Model Queue Status

64-bit Signed Integer Field Format:

state        priority          attempt
  |   |----------------------|   |
  2   2018 03 27  14 43 25 759   0
       |   |   |   |  |  |  |
       |   |   | hour |  |  |
      year |   |  minute |  |
         month |     second |
              day       millisecond
>>> status = Status(3201801020304567896)
>>> assert status.state == State.working
>>> assert status.priority == 20180102030456789
>>> assert status.attempts == 6
>>> Status.canceled(123, 5)
Status(5000000000000001235)
states = [State(1, 'created'), State(2, 'waiting'), State(3, 'working'), State(4, 'finished'), State(5, 'canceled')]
attempts

Return attempts of status.

>>> status = Status(3201801020304567896)
>>> status.attempts
6
classmethod canceled(priority=None, attempts=0)

Return new canceled status with given priority and attempts.

When priority is None (the default), uses now().

Parameters:
  • priority – integer or datetime, lower is higher priority
  • attempts (int) – number of previous attempts (0 through 9)
Returns:

status

classmethod combine(state, priority, attempts)

Combine state, priority, and attempts fields into status.

>>> Status.combine(State.waiting, 0, 1)
Status(2000000000000000001)
>>> Status.combine('waiting', 0, 2)
Status(2000000000000000002)
>>> priority = dt.datetime(2018, 1, 2, 3, 4, 56, 789123)
>>> Status.combine(State.waiting, priority, 3)
Status(2201801020304567893)
classmethod created(priority=None, attempts=0)

Return new created status with given priority and attempts.

When priority is None (the default), uses now().

Parameters:
  • priority – integer or datetime, lower is higher priority
  • attempts (int) – number of previous attempts (0 through 9)
Returns:

status

classmethod filter(field, state)

Return keyword arguments to filter field by state for querysets.

For example:

Task.objects.filter(**Status.filter('fieldname', State.working))
Parameters:
  • field (str) – field name
  • state (State) – model queue state
Returns:

keyword arguments

>>> kwargs = Status.filter('status', State.waiting)
>>> assert len(kwargs) == 2
>>> assert kwargs['status__gte'] == Status.minimum(State.waiting)
>>> assert kwargs['status__lte'] == Status.maximum(State.waiting)
>>> value = Status.filter('status', 'waiting')
>>> assert kwargs == value
classmethod finished(priority=None, attempts=0)

Return new finished status with given priority and attempts.

When priority is None (the default), uses now().

Parameters:
  • priority – integer or datetime, lower is higher priority
  • attempts (int) – number of previous attempts (0 through 9)
Returns:

status

classmethod maximum(state)

Calculate status maximum value given state.

>>> Status.maximum(State.working)
3999999999999999999
>>> Status.maximum('working')
3999999999999999999
Parameters:state – status state
Returns:maximum status value with given state
classmethod minimum(state)

Calculate status minimum value given state.

>>> Status.minimum(State.working)
3000000000000000000
>>> Status.minimum('working')
3000000000000000000
Parameters:state – status state
Returns:minimum status value with given state
parse(datetime=True)

Parse status into state, priority, and attempts fields.

If datetime is True (the default), then parse priority into datetime object. Priority has format:

       priority
|----------------------|
2018 03 27  14 43 25 759
 |   |   |   |  |  |  |
 |   |   | hour |  |  |
year |   |  minute |  |
   month |     second |
        day       millisecond
Parameters:datetime (bool) – parse priority as datetime (default True)
Returns:tuple of state, priority, and attempts
>>> status = Status(1201801020304567895)
>>> state, priority, attempts = status.parse()
>>> state
State(1, 'created')
>>> priority
datetime.datetime(2018, 1, 2, 3, 4, 56, 789000, tzinfo=<UTC>)
>>> attempts
5
>>> status = Status(3000000000000000007)
>>> status.parse(datetime=False)
(State(3, 'working'), 0, 7)
priority

Return priority of status.

>>> status = Status(3201801020304567896)
>>> status.priority
20180102030456789
state

Return state of status.

>>> status = Status(3201801020304567896)
>>> status.state
State(3, 'working')
classmethod tally(queryset, field)

Return mapping of state to count from field in queryset.

For example:

Status.tally(Task.objects.all(), 'status')
Parameters:
  • queryset – Django queryset
  • field (str) – field name
Returns:

mapping of state names to counts

classmethod waiting(priority=None, attempts=0)

Return new waiting status with given priority and attempts.

When priority is None (the default), uses now().

Parameters:
  • priority – integer or datetime, lower is higher priority
  • attempts (int) – number of previous attempts (0 through 9)
Returns:

status

classmethod working(priority=None, attempts=0)

Return new working status with given priority and attempts.

When priority is None (the default), uses now().

Parameters:
  • priority – integer or datetime, lower is higher priority
  • attempts (int) – number of previous attempts (0 through 9)
Returns:

status

StatusField

modelqueue.StatusField

alias of django.db.models.fields.BigIntegerField

Constants

modelqueue.ONE_HOUR
modelqueue.ZERO_SECS