spotify/luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.

18,703 stars Python 9 components

Orchestrates batch job pipelines with dependency resolution and workflow management

Pipeline execution begins with parameter parsing from CLI/config that instantiates root Task objects. The scheduler builds a dependency graph by recursively calling requires() on tasks, then assigns work to workers. Workers execute task.run() methods and write to Target outputs, reporting status back to scheduler. The cycle continues until all dependencies are satisfied or failures occur.

Under the hood, the system uses 3 feedback loops, 4 data pools, 4 control points to manage its runtime behavior.

A 9-component repository. 277 files analyzed. Data flows through 7 distinct pipeline stages.

How Data Flows Through the System

Pipeline execution begins with parameter parsing from CLI/config that instantiates root Task objects. The scheduler builds a dependency graph by recursively calling requires() on tasks, then assigns work to workers. Workers execute task.run() methods and write to Target outputs, reporting status back to scheduler. The cycle continues until all dependencies are satisfied or failures occur.

  1. Parse parameters and instantiate root task — ParameterParser in cmdline.py parses CLI arguments and config files to create Task instances with specific parameter values using Parameter descriptors [Parameter → Task]
  2. Build dependency graph via recursive requires() — CentralPlannerScheduler calls requires() method on each task to discover dependencies, building a complete task graph with dependency relationships [Task → Task]
  3. Check target existence for completed tasks — For each task, scheduler calls output().exists() to determine if task is already complete by checking if its Target outputs exist [Target]
  4. Assign ready tasks to workers — Scheduler identifies tasks with satisfied dependencies and sends WorkerSchedulerProtocol messages to available Worker instances for execution [Task → WorkerSchedulerProtocol]
  5. Execute task run() method — Worker calls task.run() method which performs the actual work and writes results to Target outputs using atomic file operations [Task → Target]
  6. Report task completion status — Worker sends status updates via WorkerSchedulerProtocol back to scheduler, including success, failure, and execution timing information [Target → WorkerSchedulerProtocol]
  7. Update task history and generate execution summary — ExecutionSummary component collects task execution statistics and TaskHistory records are persisted to database for analysis and reporting [WorkerSchedulerProtocol → TaskHistory]

Data Models

The data structures that flow between stages — the contracts that hold the system together.

Task luigi/task.py
Class with task_id: str, param_args: tuple, param_kwargs: dict, priority: int, status: str, plus user-defined parameters via Parameter descriptors
Created by parameter parsing, queued by scheduler, executed by worker, marked complete when output exists
Target luigi/target.py
Abstract interface with exists(): bool and open() methods, implemented as LocalTarget(path: str), S3Target(path: str, client), HdfsTarget(path: str), etc.
Created in task.output(), checked for existence in dependency resolution, written to during task execution
WorkerSchedulerProtocol luigi/rpc.py
RPC messages with task_id: str, status: str, worker_id: str, family: str, params: dict, scheduling_time: float, batch_id: int
Generated by worker during task lifecycle events, sent to scheduler via HTTP/RPC, processed to update task graph state
Parameter luigi/parameter.py
Descriptor classes like IntParameter(default: int, is_global: bool, is_list: bool), DateParameter(default: date), with _default, significant: bool, description: str
Declared as class attributes, parsed from CLI args or config, used to instantiate Task objects with specific parameter values
TaskHistory luigi/task_history.py
Records with task_id: str, family: str, params: dict, start_time: datetime, end_time: datetime, status: str, exception: str, stored in SQLite database
Created when task starts, updated with status/exception during execution, persisted to database for historical analysis

Hidden Assumptions

Things this code relies on but never validates. These are the things that cause silent failures when the system changes.

critical Environment unguarded

Assumes kubectl context is correctly configured and pointing to an accessible Kubernetes cluster, with valid authentication credentials available in ~/.kube/config

If this fails: Task silently fails or hangs indefinitely when cluster is unreachable, authentication expires, or config is malformed

examples/kubernetes.py:run method
critical Resource unguarded

Assumes Spark cluster has sufficient memory (driver_memory='2g', executor_memory='3g') and that spark-submit binary exists at configured path

If this fails: Out-of-memory errors or 'command not found' failures that don't provide clear diagnostic information about resource constraints

examples/pyspark_wc.py:PySparkTask
critical Scale unguarded

Assumes /tmp directory has sufficient space and write permissions for temporary files, with hardcoded path '/tmp/Config_%d.txt'

If this fails: Disk full or permission denied errors cause silent task failures without indicating the root cause is filesystem constraints

examples/dynamic_requirements.py:Configuration.output
critical Temporal unguarded

Assumes 218 scheduled tasks can complete within reasonable time bounds and that external dependencies remain available during execution

If this fails: Pipeline hangs indefinitely if external systems become unavailable, with no timeout mechanism to detect stalled execution

examples/execution_summary_example.py:task execution
critical Contract unguarded

Assumes FTP server HOST, USER, and PWD variables are defined and valid, with network connectivity to FTP server maintained throughout task execution

If this fails: Connection failures or authentication errors result in cryptic network exceptions rather than clear configuration validation errors

examples/ftp_experiment_outputs.py:RemoteTarget
warning Environment weakly guarded

Assumes Elasticsearch cluster is running and accessible with sufficient privileges to create indices and insert documents

If this fails: Index operations fail with connection timeouts or permission errors without validating cluster availability upfront

examples/elasticsearch_index.py:CopyToIndex
warning Scale weakly guarded

Assumes 50 total nodes is a reasonable upper bound for task graph complexity and that scheduler can handle this depth without memory issues

If this fails: Memory exhaustion or scheduler performance degradation when task graphs exceed anticipated complexity bounds

examples/foo_complex.py:max_total_nodes=50
warning Domain unguarded

Assumes Parameter objects have consistent _default, significant, description attribute structure and that luigi.parameter._no_value sentinel exists

If this fails: AttributeError during documentation generation if Parameter API changes or custom parameter types don't follow expected interface

doc/conf.py:parameter_repr
info Temporal unguarded

Assumes 5-second sleep is sufficient for simulating work and that system clock advances predictably during execution

If this fails: Timing-dependent behavior may not work correctly in containerized environments or systems with clock adjustments

examples/dynamic_requirements.py:time.sleep(5)
info Environment unguarded

Assumes --scheduler-retry-delay and --logging-conf-file paths exist and are accessible, with specific log configuration format expected

If this fails: Retry policy demonstration fails silently or with unclear errors if configuration files are missing or malformed

examples/per_task_retry_policy.py:scheduler configuration

System Behavior

How the system operates at runtime — where data accumulates, what loops, what waits, and what controls what.

Data Pools

Task registry (registry)
Global registry mapping task family names to Task class objects for dynamic instantiation
Task dependency graph (in-memory)
In-memory graph structure tracking task dependencies, status, and execution state in CentralPlannerScheduler
Task execution history (database)
SQLite database storing historical task execution records with timing, status, and failure information
Configuration store (file-store)
Hierarchical configuration from files, environment variables, and CLI args

Feedback Loops

Delays

Control Points

Technology Stack

Python (runtime)
Primary runtime for task definition, scheduling logic, and execution coordination
Tornado (framework)
HTTP server for scheduler web interface and worker-scheduler RPC communication
SQLite (database)
Stores task execution history and metadata for pipeline analysis and monitoring
python-dateutil (library)
Date parameter parsing and manipulation for time-based task scheduling
tenacity (library)
Retry logic implementation for failed task execution with configurable backoff strategies

Key Components

Explore the interactive analysis

See the full architecture map, data flow, and code patterns visualization.

Analyze on CodeSea

Compare luigi

Related Repository Repositories

Frequently Asked Questions

What is luigi used for?

Orchestrates batch job pipelines with dependency resolution and workflow management spotify/luigi is a 9-component repository written in Python. Data flows through 7 distinct pipeline stages. The codebase contains 277 files.

How is luigi architected?

luigi is organized into 5 architecture layers: Task Definition Layer, Scheduler Core, Target System, Execution Interface, and 1 more. Data flows through 7 distinct pipeline stages. This layered structure keeps concerns separated and modules independent.

How does data flow through luigi?

Data moves through 7 stages: Parse parameters and instantiate root task → Build dependency graph via recursive requires() → Check target existence for completed tasks → Assign ready tasks to workers → Execute task run() method → .... Pipeline execution begins with parameter parsing from CLI/config that instantiates root Task objects. The scheduler builds a dependency graph by recursively calling requires() on tasks, then assigns work to workers. Workers execute task.run() methods and write to Target outputs, reporting status back to scheduler. The cycle continues until all dependencies are satisfied or failures occur. This pipeline design reflects a complex multi-stage processing system.

What technologies does luigi use?

The core stack includes Python (Primary runtime for task definition, scheduling logic, and execution coordination), Tornado (HTTP server for scheduler web interface and worker-scheduler RPC communication), SQLite (Stores task execution history and metadata for pipeline analysis and monitoring), python-dateutil (Date parameter parsing and manipulation for time-based task scheduling), tenacity (Retry logic implementation for failed task execution with configurable backoff strategies). A focused set of dependencies that keeps the build manageable.

What system dynamics does luigi have?

luigi exhibits 4 data pools (Task registry, Task dependency graph), 3 feedback loops, 4 control points, 3 delays. The feedback loops handle retry and polling. These runtime behaviors shape how the system responds to load, failures, and configuration changes.

What design patterns does luigi use?

4 design patterns detected: External Task Pattern, Wrapper Task Pattern, Atomic Output Pattern, Parameter Inheritance.

How does luigi compare to alternatives?

CodeSea has side-by-side architecture comparisons of luigi with prefect. These comparisons show tech stack differences, pipeline design, system behavior, and code patterns. See the comparison pages above for detailed analysis.

Analyzed on April 19, 2026 by CodeSea. Written by .