ray-project/ray
Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
Distributes AI/ML workloads across clusters with automatic scaling and resource management
Ray applications start with ray.init() which connects to or starts a cluster. Users submit distributed functions via @ray.remote decorators, which creates TaskSpecs that flow through the GCS for global coordination and Raylets for local scheduling. Tasks execute on CoreWorkers, producing ObjectRefs that reference results in the distributed object store. For ML workloads, higher-level libraries like Tune coordinate multiple training jobs while Serve handles model inference with automatic scaling based on request load.
Under the hood, the system uses 5 feedback loops, 4 data pools, 5 control points to manage its runtime behavior.
A 8-component repository. 7101 files analyzed. Data flows through 6 distinct pipeline stages.
How Data Flows Through the System
Ray applications start with ray.init() which connects to or starts a cluster. Users submit distributed functions via @ray.remote decorators, which creates TaskSpecs that flow through the GCS for global coordination and Raylets for local scheduling. Tasks execute on CoreWorkers, producing ObjectRefs that reference results in the distributed object store. For ML workloads, higher-level libraries like Tune coordinate multiple training jobs while Serve handles model inference with automatic scaling based on request load.
- Cluster Initialization — ray.init() establishes connection to existing cluster or starts local cluster by launching GCS server, Raylet processes, and CoreWorkers on each node [ClusterConfig → ClusterState]
- Task Submission — User calls @ray.remote decorated function, which serializes arguments, creates TaskSpec with resource requirements and function metadata, and submits to local Raylet [Function Arguments → TaskSpec]
- Global Scheduling — GCS receives TaskSpec from Raylet, consults cluster state for resource availability, and assigns task to node with sufficient resources using locality-aware placement [TaskSpec → Task Assignment]
- Local Execution — Target Raylet receives assignment, finds available CoreWorker, deserializes task arguments from object store, and executes user function in isolated process [Task Assignment → ObjectRef]
- Result Storage — CoreWorker serializes function return value, stores in local Plasma object store with unique ObjectID, and notifies GCS of completion for dependency resolution [Function Result → ObjectRef]
- Result Retrieval — ray.get() call resolves ObjectRef by fetching data from object store (local or remote), deserializing value, and returning to user application [ObjectRef → Python Object]
Data Models
The data structures that flow between stages — the contracts that hold the system together.
python/ray/_raylet.pyxPython wrapper around C++ ObjectID containing task_id: bytes, object_index: int, and metadata for distributed object references
Created when ray.remote() task returns a value, stored in distributed object store, resolved when ray.get() is called
src/ray/core_worker/task_spec.hC++ structure with function_descriptor, args: vector<ObjectRef>, num_returns: int, resources: ResourceSet, and scheduling metadata
Built from ray.remote() call parameters, queued by scheduler, dispatched to worker process
src/ray/core_worker/core_worker.hC++ class containing worker_id: WorkerID, process_id: pid_t, assigned_resources: ResourceSet, and runtime environment config
Created during cluster initialization, updated during task assignment, destroyed on worker termination
python/ray/util/placement_group.pyPython dataclass with bundles: List[Dict[str, float]], strategy: PlacementStrategy (PACK/SPREAD/STRICT_PACK), and name: Optional[str]
Created via ray.util.placement_group(), reserved on cluster nodes, used to co-locate related tasks
python/ray/data/dataset.pyRay Data abstraction over distributed blocks with schema: pyarrow.Schema, _plan: ExecutionPlan containing read/transform operations
Lazy evaluation tree built by chaining operations, executed when action (write/iterate) is called
python/ray/serve/_private/deployment_state.pyPydantic model with name: str, replica_config: ReplicaConfig (num_replicas, resources), route_prefix: str, and health status
Defined in serve.deployment decorator, deployed to cluster, scales based on traffic patterns
Hidden Assumptions
Things this code relies on but never validates. These are the things that cause silent failures when the system changes.
The function modifies sys.path by inserting local directories 'thirdparty_files' and current directory at index 0, assuming these directories exist and contain required modules like 'aiohttp' and 'runtime_env_agent'
If this fails: If the expected directory structure doesn't exist or required modules are missing, the subsequent imports will fail with ModuleNotFoundError, causing the runtime environment agent to crash during startup
python/ray/_private/runtime_env/agent/main.py:import_libs
The formatUrl function assumes URLs starting with '/' should have the leading slash removed for reverse proxy compatibility, but never validates if the resulting URL is actually valid or reachable
If this fails: Malformed URLs like '//example.com' become '/example.com' which could redirect requests to wrong endpoints or cause 404 errors, leading to silent failures in dashboard API calls
python/ray/dashboard/client/src/service/requestHandlers.ts:formatUrl
Status color mappings assume all status enums are complete and matching - if a new status value is added to TaskStatus, JobStatus, or other enums but not to the color map, it will return undefined color
If this fails: New status values render without colors, appearing as blank or default-styled chips in the dashboard, making status information invisible to users
python/ray/dashboard/client/src/components/StatusChip.tsx:getColorMap
The updatePage function assumes pages are updated in the correct order by finding pageIndex via findIndex, but if multiple pages have the same ID, it will always update the first match
If this fails: If duplicate page IDs exist in the hierarchy, only the first page gets updated while later pages with the same ID remain stale, leading to inconsistent breadcrumb navigation
python/ray/dashboard/client/src/pages/layout/mainNavContext.ts:updatePage
Hard-coded INFERENCE_BATCH_SIZE of 900 assumes T4 GPU memory capacity, but the actual device is determined at runtime (cpu if smoke_test else cuda) without checking available GPU memory
If this fails: On GPUs with less memory than T4, inference will fail with CUDA out-of-memory errors. On CPUs, the large batch size may cause system memory exhaustion and process crashes
release/nightly_tests/dataset/image_classification_from_parquet/main.py:INFERENCE_BATCH_SIZE
The calculation OVERRIDE_NUM_BLOCKS = int(NUM_CONTAINERS * NUM_UNITS / IMAGES_PER_BLOCK) assumes NUM_CONTAINERS (50), NUM_UNITS (1380), and IMAGES_PER_BLOCK (11) are fixed constants that scale linearly with cluster size
If this fails: On smaller clusters, this creates too many blocks leading to task overhead and scheduling delays. On larger clusters, blocks become too large causing memory pressure and potential OOM failures
release/nightly_tests/dataset/image_embedding_from_uris/main.py:NUM_CONTAINERS
Authentication error handling dispatches AUTHENTICATION_ERROR_EVENT immediately when receiving 401/403, assuming the event listener is already registered and the authentication dialog component is ready to handle it
If this fails: If the authentication dialog hasn't been initialized yet or the event listener is not registered, authentication errors are silently ignored, leaving users unable to authenticate and stuck with failed requests
python/ray/dashboard/client/src/service/requestHandlers.ts:axiosInstance.interceptors.response
The PyArrow schema defines fixed column names and types (metadata00-18, span_text) assuming the input Parquet files always contain exactly these columns in this exact format
If this fails: If input files have different schemas, missing columns, or type mismatches, Ray Data will fail during read operations with schema validation errors, causing the entire text embedding pipeline to crash
release/nightly_tests/dataset/text_embedding/main.py:SCHEMA
The deployment assumes CUDA is available and the GPU has sufficient memory (≥4GB) for the StableDiffusion model with fp16 precision, but never checks GPU availability or memory before loading
If this fails: On CPU-only machines or GPUs with insufficient memory, the model loading fails with CUDA errors or OOM, causing the entire Serve deployment to crash and preventing the service from starting
release/workspace_templates/03_serving_stable_diffusion/app.py:StableDiffusionV2.__init__
The constant INFERENCE_LATENCY_PER_IMAGE_S = 0.0094 is hard-coded based on T4 GPU performance measurements, assuming all inference will run on identical hardware with consistent performance
If this fails: On different GPU types or when GPU is under load, actual inference times differ significantly from this assumption, leading to incorrect capacity planning and potential timeouts in production workloads
release/nightly_tests/dataset/image_embedding_from_uris/main.py:INFERENCE_LATENCY_PER_IMAGE_S
System Behavior
How the system operates at runtime — where data accumulates, what loops, what waits, and what controls what.
Data Pools
Shared memory store for large arrays and tensors with zero-copy access across processes on same node
Persistent cluster metadata including node membership, job status, actor registry, and placement group state
Optional persistent backend for GCS that provides crash recovery and cluster state persistence across restarts
Per-node priority queue of pending tasks waiting for worker assignment or resource availability
Feedback Loops
- Autoscaling Loop (auto-scale, balancing) — Trigger: Resource shortage or excess capacity detected. Action: Ray Autoscaler adds or removes nodes from cluster based on pending task queue and resource utilization. Exit: Target cluster size reached.
- Serve Replica Scaling (auto-scale, balancing) — Trigger: Request queue length or latency exceeds thresholds. Action: ServeController increases/decreases replica count per deployment to match traffic demand. Exit: Target QPS achieved within latency bounds.
- Task Retry Loop (retry, reinforcing) — Trigger: Task failure due to worker crash or resource shortage. Action: Raylet reschedules failed task on different worker with exponential backoff. Exit: Task succeeds or max retry count exceeded.
- Tune Trial Optimization (training-loop, reinforcing) — Trigger: Trial completion with performance metric. Action: TuneTrialExecutor generates new hyperparameter configurations using search algorithm (grid, random, Bayesian). Exit: Search budget exhausted or convergence criteria met.
- Data Backpressure (backpressure, balancing) — Trigger: Downstream operators cannot keep up with data production rate. Action: StreamingExecutor slows upstream operators and buffers intermediate results to prevent OOM. Exit: Processing rates rebalance.
Delays
- Object Store Transfer (async-processing, ~network latency + serialization time) — ray.get() blocks until remote object is transferred and deserialized locally
- Runtime Environment Setup (warmup, ~30s-5min depending on conda/pip dependencies) — First task in new environment waits for dependency download and environment creation
- Cluster Autoscaling (eventual-consistency, ~30s-3min depending on cloud provider) — Tasks queue until new nodes join cluster and pass health checks
- GCS Fault Recovery (eventual-consistency, ~5-30s) — New task submissions blocked until GCS rebuilds cluster state from persistent storage
Control Points
- Ray Resource Limits (threshold) — Controls: Maximum CPU/GPU/memory allocation per task and cluster-wide resource reservations. Default: RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE=1
- Object Store Memory (threshold) — Controls: Plasma store size determines how much data can be cached in memory before spilling to disk. Default: 30% of node RAM by default
- Task Parallelism (runtime-toggle) — Controls: Number of concurrent tasks per worker process affects throughput vs memory usage tradeoffs. Default: RAY_DISABLE_IMPORT_WARNING=1
- Serve Deployment Config (hyperparameter) — Controls: Replica count, resource allocation, and autoscaling parameters for model deployments. Default: varies per deployment
- Tune Search Algorithm (architecture-switch) — Controls: Whether to use grid search, Bayesian optimization, or population-based training for hyperparameter search. Default: configured per experiment
Technology Stack
Handles zero-copy serialization of columnar data structures between Python and C++ components for efficient data transfer
Provides inter-node communication protocol for control messages between GCS, Raylets, and CoreWorkers in the distributed system
Implements shared memory object store for zero-copy data access across processes on the same node using memory-mapped files
Builds the multi-language codebase with precise dependency management between C++, Python, and Java components
Powers the web dashboard for cluster monitoring, job tracking, and distributed debugging with real-time updates
Integrates with Ray Train and RLlib for distributed deep learning training with automatic data parallelism
Provides container orchestration for Ray clusters with the KubeRay operator handling node lifecycle and autoscaling
Default storage format for Ray Data enabling efficient columnar analytics and ML feature preprocessing
Key Components
- CoreWorker (executor) — The main execution engine on each node that runs user tasks, manages object store access, and handles communication with the raylet scheduler
src/ray/core_worker/core_worker.cc - Raylet (scheduler) — Per-node scheduler that decides task placement, manages local resources, and coordinates with the Global Control Service for cluster-wide decisions
src/ray/raylet/raylet.cc - GlobalControlService (orchestrator) — Centralized cluster metadata service that tracks node membership, job lifecycle, actor registry, and coordinates global scheduling decisions
src/ray/gcs/gcs_server/gcs_server.cc - PlasmaObjectStore (store) — Shared memory object store that provides zero-copy access to large arrays and tensors across processes on the same node
src/ray/object_manager/plasma/ - RuntimeEnvAgent (resolver) — Downloads and manages Python dependencies, conda environments, and container images required by jobs, providing isolation between different workloads
python/ray/_private/runtime_env/agent/runtime_env_agent.py - ServeController (orchestrator) — Manages model deployment lifecycle including replica scaling, traffic routing, and health monitoring for the Ray Serve model serving system
python/ray/serve/_private/controller.py - TuneTrialExecutor (scheduler) — Coordinates hyperparameter search by launching training trials, collecting metrics, and applying early stopping and population-based training algorithms
python/ray/tune/execution/trial_runner.py - DatasetExecutor (processor) — Executes data transformation pipelines using streaming execution with backpressure control and automatic batching for ML preprocessing
python/ray/data/_internal/executor/streaming_executor.py
Package Structure
Core Ray runtime engine with distributed computing primitives, dashboard, and ML libraries for training, serving, and hyperparameter tuning
Testing framework and benchmark suite for validating Ray's performance and correctness across different ML workloads
Explore the interactive analysis
See the full architecture map, data flow, and code patterns visualization.
Analyze on CodeSeaRelated Repository Repositories
Frequently Asked Questions
What is ray used for?
Distributes AI/ML workloads across clusters with automatic scaling and resource management ray-project/ray is a 8-component repository written in Python. Data flows through 6 distinct pipeline stages. The codebase contains 7101 files.
How is ray architected?
ray is organized into 5 architecture layers: Core Runtime, Python API Layer, ML Libraries, Dashboard & Monitoring, and 1 more. Data flows through 6 distinct pipeline stages. This layered structure keeps concerns separated and modules independent.
How does data flow through ray?
Data moves through 6 stages: Cluster Initialization → Task Submission → Global Scheduling → Local Execution → Result Storage → .... Ray applications start with ray.init() which connects to or starts a cluster. Users submit distributed functions via @ray.remote decorators, which creates TaskSpecs that flow through the GCS for global coordination and Raylets for local scheduling. Tasks execute on CoreWorkers, producing ObjectRefs that reference results in the distributed object store. For ML workloads, higher-level libraries like Tune coordinate multiple training jobs while Serve handles model inference with automatic scaling based on request load. This pipeline design reflects a complex multi-stage processing system.
What technologies does ray use?
The core stack includes Apache Arrow (Handles zero-copy serialization of columnar data structures between Python and C++ components for efficient data transfer), gRPC (Provides inter-node communication protocol for control messages between GCS, Raylets, and CoreWorkers in the distributed system), Plasma (Implements shared memory object store for zero-copy data access across processes on the same node using memory-mapped files), Bazel (Builds the multi-language codebase with precise dependency management between C++, Python, and Java components), React/TypeScript (Powers the web dashboard for cluster monitoring, job tracking, and distributed debugging with real-time updates), PyTorch/TensorFlow (Integrates with Ray Train and RLlib for distributed deep learning training with automatic data parallelism), and 2 more. A focused set of dependencies that keeps the build manageable.
What system dynamics does ray have?
ray exhibits 4 data pools (Plasma Object Store, GCS State Store), 5 feedback loops, 5 control points, 4 delays. The feedback loops handle auto-scale and auto-scale. These runtime behaviors shape how the system responds to load, failures, and configuration changes.
What design patterns does ray use?
5 design patterns detected: Remote Procedure Call, Actor Model, Shared Memory Object Store, Hierarchical Scheduling, Lazy Evaluation DAG.
Analyzed on April 20, 2026 by CodeSea. Written by Karolina Sarna.