Code can be found at the end of this post.


A colleague recently introduced me to Luigi, an open-source project started in 2014 by Spotify. I was trying out different pipeline management frameworks like kedro, mlflow, pipelinx, and a few other popular ones and none had what I needed. Luigi for many reasons was the only thing I’ve found that provides both extreme flexibility while still constraining a fairly strict pattern of design that is maintainable, re-usable, and intuitive.

Luigi’s simple premise is:
If a task produces an output(returns non-None from Task.output()), and if that output exists we do not run the task, if the output does not exist we run the task to generate the output.

Early on I found myself wanting to be able to force a task to run every time, or sometimes, but for that to be a user defined action and not for it to be solely defined by the default behaviors of the luigi.Task class.


A quick example, so we have this task structure of 5 tasks where:

  • Task C depend on Task A2 which depends on Task A1
  • Task C depends on Task B2 which depends on Task B1
[Task A1]    [Task B1]
    |            |
    |            |
[Task A2]    [Task B2] 
     \          /
      \        /
       \      /           # Wrapper task just means that it produces no output
   [Wrapper Task C]       # meaning Task C always runs

With the generic luigi task if I run task C, it will first run A1 and B1 then A2 and B2, then finally C. If we run Task C a second time, since the outputs of all the tasks exist, none of the tasks will run.
So we run the pipeline once.

Start:                          1st Run:                         2nd Run:
[Task A1]    [Task B1]          [Task A1]*   [Task B1]*          [Task A1]    [Task B1]
    |            |                  |            |                  |            |
    |            |                  |            |                  |            |
[Task A2]    [Task B2]          [Task A2]*   [Task B2]*          [Task A2]@   [Task B2]@
     \          /                   \          /                     \          /
      \        /                     \        /                       \        /
       \      /                       \      /                         \      /
   [Wrapper Task C]               [Wrapper Task C]*                [Wrapper Task C]*

* denotes task ran successfully
@ denotes output for task already exists so task not run in the 2nd run

So now if we want to run Wrapper Task C again luigi will first need Task A2 and Task B2 but since the outputs for these two tasks exist already luigi does not run them and uses the outputs that already exist. Luigi won’t even check if the outputs for Task A1 or Task B1 exist in this scenario as it stops traversing down the tree the moment it has everything it needs to run Task C.


Often we want to rerun a task, and re-generate the output even if the output exists. A few reasons would be:

  • The data has been updated
  • The processing code has been changed
  • Output targets have been corrupted by other processes (shouldn’t happen if we treat outputs as immutable)

There are probably many other reasons you may find yourself needing the ability to force a rerun.


One method of doing this is to have your task override the complete() function. If complete() returns false then that task will be run even if the output exists. If we don’t need manual control in forcing a run and we have a specific case in which we want the task to rerun and to regenerate the output then we can programmatically do this check in our overide of complete().

class RunWhenTask(luigi.Task):
    """A luigi task which will sometimes rerun even if output already exists"""
    def complete():
        # our addition
        if api.data_has_been_updated():
            return False
        # otherwise do the default luigi behavior for complete()
        outputs = flatten(self.output())        
        if len(outputs) == 0:
            return False

        return all(map(lambda output: output.exists(), outputs))

Though this approach made sense at first, a few problems quickly emerged:

  • Means that the task was no longer isolated as it had an external check which modifies task behavior. Could easily lead to a mess.
  • Has to be individually implemented for each task that needs to be able to rerun.
  • What if a upstream dependency needs to be updated.
    • For example, in the earlier diagrams what if Task A1 has an external datasource that has been updated. Downstream tasks will have a difficult time knowing if upstream tasks need to be re-run, and though it can be implemented, to me it seems like a pretty messy and potentially problematic solution.

The better option, at least the solution that made the most sense to me was to control forcible re-running manually through luigi’s built in configuration files.

I created an extension of the luigi.Task called ForcibleTask which accepted 3 parameters:
force: bool (default false) - When true forces the task to run
force_upstream: bool (default false) - When true forces task and all upstream ForcibleTasks to run
lock: bool (default false) - When true locks the task from being forced by a downstream tasks.

The goal here was that if none of these parameters were defined the task would behave as a regular luigi task, but if defined provide ability to manually force run behavior.
I can use the luigi configuration file, which is built into luigi. The config system is nice because we don’t need to define these parameters when we run a task, instead we can define them in a config file which is just an .ini file, below is a small example of how that can look:

[TaskName]
bool_parameter = True
string_parameter = a string

ForcibleTask behaviors


[TaskA1]*    [TaskB1]*
    |            |
    |            |
[TaskA2]*    [TaskB2]* 
     \          /
      \        /
       \      /           # Wrapper task just means that it produces no output
   [Wrapper Task C]       # meaning Task C always runs
   
* denotes task ran successfully

Going back to our original sample suppose TaskA1, TaskA2, TaskB1, and TaskB2 all extend this new ForcibleTask class.

Example 1

### example.cfg file ###
[TaskA1]
; we don't need to define any of the 3 parameters
; we don't even need to define this config section since it is empty
; in this example A1, B1, and B2 will behave as a normal luigi task

[TaskA2]
force = True

[TaskB1]

[TaskB2]

In this example we’ve already run our pipeline, meaning our outputs from A1, A2, B1, and B2 all exist and luigi’s default behavior would be to not run them if we run the pipeline again.

Since now these extend ForcibleTask the behavior of running our pipeline again will be different using the above config.
TaskA2 will be forced to run and re-generate the outputs.
TaskA1,TaskB1, and TaskB2 will not run because they were not configured to be forced and their outputs already exist. If their outputs didn’t exist then they would still run as they normally would.

Example 2

### example.cfg file ###
[TaskA2]
force_upstream = True

Using this config we not only force TaskA2 to run, but we also force all upstream tasks to run. In this scenario both TaskA1 and TaskA2 will be re-run.

Example 3

### example.cfg file ###
[TaskA2]
force_upstream = True

[TaskA1]
lock = True

Sometimes we have tasks that do important things and we want to be really careful about not accidentally forcing them to re-run. An example from my work is with tasks that create/modify/remove database content and I wan’t to be really careful with managing when they are run.
Unlike the previous example in this case only TaskA2 will run, TaskA1 is locked and will not be forced by force_upstream.

Example 4

### example.cfg file ###
[TaskA1]
force = True

What happens in this scenario?
Well in this scenario since TaskA2 is not forced, it is not run nor does it even check if TaskA1 is complete. Because of this TaskA1 won’t be run nor will any of the other A or B tasks.


ForcibleTask implementation


# To use define your own class extending this
# class MyClass(ForcibleTask):
#   def requires(): ...
#   def output(): ...
#   def cleanup(): ...
#   def run(): ...

def toggle_force_to_false(func):
    # Wrap the run task so that when run is normally called
    # we call this task instead which
    # first calls cleanup() before calling run()
    def wrapper(self, *args, **kwargs):
        if self.lock and self.complete():
            raise Exception("Hit run() on a locked task.?!")
        if not self.lock:
            self.cleanup()
        self.force = False
        return func(self, *args, **kwargs)
            return wrapper

class ForcibleTask(luigi.Task):
    """A luigi task which can be forceably rerun"""
    force = luigi.BoolParameter(significant=False, default=False)
    force_upstream = luigi.BoolParameter(significant=False, default=False)
    lock = luigi.BoolParameter(significant=False, default=False)

    def cleanup(self):
        # ForcibleTask's must define a cleanup function
        raise NotImplementedError('Must define cleanup %s' % self.task_id)

    # override the complete function to 
    # return False when we have forced the task to run
    # if we have not forced the task to run return as normal
    def complete(self):
        if self.force and not self.lock:
            return False
        outputs = flatten(self.output())
        if len(outputs) == 0:
            return False
        return all(map(lambda output: output.exists(), outputs))

    # walk through tree of upstream dependencies and return
    # this is called if a task has force_upstream to true
    def get_upstream_tasks(self):
        done = False
        tasks = [self]
        checked = []
        while not done:
            tasks += luigi.task.flatten(tasks[0].requires())
            checked.append(tasks.pop(0))
            if len(tasks) == 0:
                done = True
        return checked

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self.force_upstream:
            # Force upstream children to be rerun
            self.force = True
            children = list(reversed(self.get_upstream_tasks()))
            for child in children:
                child.force = True

    def __init_subclass__(cls):
        super().__init_subclass__()
        # override task run with our wrapper run function that also cleans up
        cls.run = toggle_force_to_false(cls.run)