medicine-mts/appa/management/commands/async_task.py

39 lines
937 B
Python

import importlib
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
'task',
nargs='?',
)
parser.add_argument(
'params',
nargs='*',
)
parser.add_argument(
'--async',
action='store_true',
dest='async',
)
def handle(self, *args, **options):
task_path = options['task']
p, m = task_path.rsplit('.', 1)
try:
mod = importlib.import_module(p)
except ModuleNotFoundError as e:
raise CommandError(e)
if not hasattr(mod, m):
raise CommandError(f'Task {m} not found')
task = getattr(mod, m)
if options['async']:
task.apply_async(*options['params'])
else:
task(*options['params'])