Design a system that efficiently downloads and distributes a large ML model (e.g., 500GB) from external storage to all GPU workers in a data center cluster. The system must handle bandwidth constraints, coordinate thousands of workers, and handle failures gracefully.
Model size: 500GB (large language model weights)
Number of workers: 100-1000 GPU workers in the data center
External bandwidth: 10 Gbps (download from external storage like S3, GCS, or model registry)
Internal bandwidth per worker: 10 Gbps full-duplex (10 Gbps upload AND 10 Gbps download simultaneously)
Objective: Minimize total time to deploy model to ALL workers
Reliability requirement: Handle worker failures during deployment
Full-duplex networking means workers can upload and download simultaneously at full speed. This is standard in modern data centers and is critical for the pipeline approach to work efficiently.
Disclaimer: This is a sample solution to help you get started. To better prepare for the interview, you should think through the question yourself and try to come up with your own solution. System design questions are open-ended and have multiple valid approaches.
Download model from external storage: Pull 500GB model from S3/GCS to the data center
Distribute to all workers: Every GPU worker receives a complete copy of the model
Track deployment progress: Monitor which workers have received which chunks
Handle failures gracefully: Recover from worker failures without restarting entire deployment
Verify data integrity: Ensure model files are not corrupted during transfer
Speed: Minimize total deployment time (target: ~8 minutes for 100 workers)
Scalability: Support 100-1000+ workers; with 1 GB chunks and 10 Gbps links, download dominates up to ~500 workers, propagation dominates beyond
Reliability: Handle 1-5% worker failure rate during deployment
Bandwidth efficiency: Maximize utilization of both external and internal network links
Given:
Model size: 500 GB
External bandwidth: 10 Gbps
Internal bandwidth: 10 Gbps per worker (full-duplex)
Workers: 100 (initial), 1000 (target)
Key calculations:
Time to transfer 500GB at 10 Gbps: 500 GB × 8 bits/byte / 10 Gbps = 400 seconds
Chunk size (1 GB) transfer time: 1 GB × 8 / 10 Gbps = 0.8 seconds
Number of chunks: 500 GB / 1 GB = 500 chunks
The external link is the bottleneck—only one copy can be downloaded at 10 Gbps.
The question is: how do we distribute that copy to 100+ workers efficiently?
The key insight is that the external bandwidth is fixed at 10 Gbps. No matter what strategy you use, downloading the first copy takes 400 seconds. The distribution strategy determines what happens after that.
Deployment {
deployment_id: UUID (PK)
model_id: String
version: String
status: Enum (init, downloading, distributing, verifying, complete, failed)
total_chunks: Integer
started_at: Timestamp
completed_at: Timestamp
}
Worker {
worker_id: String (PK)
status: Enum (idle, receiving, complete, failed)
chunks_received: [Integer] -- List of chunk IDs
upstream_worker: String -- Who sends chunks to this worker
downstream_worker: String -- Who receives chunks from this worker
last_heartbeat: Timestamp
}
Chunk {
chunk_id: Integer
offset: Integer -- Byte offset in model file
size: Integer -- Bytes (typically 1 GB)
checksum: String -- SHA256 for verification
}
Manifest {
model_id: String
version: String
total_size: Integer
chunk_size: Integer
chunks: [Chunk]
global_checksum: String
}
Coordinator state (etcd/Consul): Deployment status, pipeline topology, worker assignments
Worker progress (Redis): Real-time chunk completion tracking, heartbeats
Audit logs (PostgreSQL): Historical deployments, error logs
POST /deployments
Request: { model_id, version, source_url, target_workers[] }
Response: { deployment_id, status, estimated_time_seconds }
GET /deployments/{deployment_id}
Response: { deployment_id, status, progress, workers_complete, workers_failed }
DELETE /deployments/{deployment_id}
Response: { success }
POST /workers/register
Request: { worker_id, hostname, rack, available_storage }
Response: { registered: true }
POST /workers/heartbeat
Request: { worker_id, status, chunks_received[], bandwidth_in, bandwidth_out }
Response: { continue: true }
service ChunkTransfer {
rpc StreamChunks(stream ChunkRequest) returns (stream ChunkData);
}
message ChunkRequest {
int32 chunk_id = 1;
}
message ChunkData {
int32 chunk_id = 1;
bytes data = 2;
string checksum = 3;
}
gRPC for worker-to-worker chunk transfers:
Streaming support for large data transfers
Built-in flow control and backpressure
Efficient binary protocol (no Base64 overhead)
REST for coordinator APIs:
Simple CRUD operations
Easy debugging and monitoring
Standard tooling support
Before designing the system, we need to choose the right distribution strategy. Let's analyze three approaches:
Approach 1: Sequential Download (Naive)
Every worker downloads directly from external storage.
External Storage (10 Gbps shared)
│
├──→ W1 (10 Gbps / 100 = 0.1 Gbps each)
├──→ W2
├──→ ...
└──→ W100
Time calculation:
Each worker gets: 10 Gbps / 100 workers = 0.1 Gbps
Time per worker: 500 GB × 8 / 0.1 Gbps = 40,000 seconds ≈ 11 hours
Verdict: Completely unacceptable. External bandwidth is the bottleneck.
Approach 2: Binary Tree Distribution
One worker downloads, then distributes in a tree pattern.
External Storage
│
W1 (root)
/ \
W2 W3
/\ /\
W4 W5 W6 W7
...
Time calculation (pipelined):
Each parent splits 10 Gbps across 2 children => 5 Gbps per child
Effective throughput of unique data into the tree: 5 Gbps
Time to stream full model into the tree: 500 GB × 8 / 5 Gbps = 800 seconds
Depth for 100 workers: ceil(log2 100) = 7 levels
Per-level chunk latency: 1 GB × 8 / 5 Gbps = 1.6 seconds
Total ≈ 800s + (7 × 1.6s) = 811s ≈ 13.5 minutes
For 1,000 workers: depth ≈ 10 → 800s + 16s ≈ 13.6 minutes
Verdict: Competitive at larger N, but adds coordination complexity and splits bandwidth per level.
Approach 3: Pipeline Distribution (Simple baseline for moderate N)
Workers form a linear chain. Each worker receives chunks and immediately forwards them to the next worker.
External Storage
│
W1 ──→ W2 ──→ W3 ──→ ... ──→ W100
│ │ │ │
(recv) (recv) (recv) (recv)
+ + +
(send) (send) (send)
Time calculation:
Phase 1: Download all 500 chunks through W1
500 chunks × 0.8 seconds = 400 seconds
(W1 receives from external AND sends to W2 simultaneously)
Phase 2: Last chunk propagates to W100
99 hops × 0.8 seconds = 79.2 seconds
Total: 400s + 79.2s = 479 seconds ≈ 8 minutes
Verdict: Great for ~100 workers and very simple to operate. For large N, the linear propagation term can dominate.
Where: N = workers, S = model size transfer time at 10 Gbps, C = chunk transfer time at 10 Gbps (tree uses ~2× S and C due to bandwidth splitting)
A centrally scheduled P2P mesh can approach the external-link lower bound plus small propagation overhead, but requires significantly more scheduler/control-plane complexity.
Pipeline distribution is the simplest high-throughput option and is near-optimal for ~100 workers. At larger N, a shallow tree (or tree-of-pipelines) can reduce propagation time at the cost of splitting bandwidth and more coordination.
Data Center
External Storage
GPU Workers - Pipeline
Control Plane
10 Gbps
10 Gbps
10 Gbps
...
state
progress
heartbeat
heartbeat
heartbeat
heartbeat
S3 / GCS
Coordinator
etcd
Redis
W1 Seed Worker
W2
W3
W100
Phase 1: Initialization
User triggers deployment via POST /deployments
Coordinator fetches manifest from model registry (pre-computed checksums stored alongside model)
Coordinator assigns pipeline topology: [W1, W2, W3, ..., W100]
Coordinator distributes manifest to all workers
All workers acknowledge readiness
The manifest (chunk boundaries and checksums) should be pre-computed when the model is uploaded to the registry. This avoids the coordinator needing to download the entire model just to compute checksums.
Phase 2: Seeding (Download First Copy)
Coordinator signals W1 to start downloading
W1 downloads chunk 0 from S3, verifies checksum, writes to disk
W1 immediately streams chunk 0 to W2 (simultaneously with downloading chunk 1)
Pipeline builds up—each worker receives AND sends simultaneously:
W1: Download chunk N, send chunk N-1 to W2
W2: Receive chunk N-1 from W1, send chunk N-2 to W3
W3: Receive chunk N-2 from W2, send chunk N-3 to W4
Phase 3: Steady-State Distribution
# Each worker runs two threads simultaneously
# Thread 1: Receive
async def receive_chunks():
for chunk_id in range(500):
data = await upstream.receive(chunk_id)
if sha256(data) != manifest.chunks[chunk_id].checksum:
await request_retry(chunk_id)
continue
await write_to_disk(chunk_id, data)
await report_progress(chunk_id)
# Thread 2: Send (all except last worker)
async def send_chunks():
for chunk_id in range(500):
await wait_until_chunk_available(chunk_id)
data = await read_from_disk(chunk_id)
await downstream.send(chunk_id, data)
Phase 4: Completion & Verification
Each worker receives all 500 chunks
Worker assembles model and computes global checksum
Worker verifies against manifest's global checksum
Worker reports SUCCESS or VERIFICATION_FAILED to coordinator
Coordinator marks deployment complete when all workers confirm
Problem: What's the optimal chunk size?
Formula:
Optimal chunk size = Bandwidth × Target transfer time
= 10 Gbps × 0.5-1.0 seconds
= 625 MB - 1.25 GB
Choose: 1 GB (nice round number, 0.8s transfer)
Chunk size balances two concerns: (1) small enough for responsive pipelining and quick retries, (2) large enough to amortize per-chunk overhead (checksums, metadata, disk ops).
Worker Failure Mid-Pipeline
Scenario: W3 crashes while pipeline is active.
Before: W1 → W2 → W3 → W4 → W5
After: W1 → W2 ──────→ W4 → W5
(skip W3)
Recovery steps:
Coordinator detects W3 failure (e.g., missed 2-3 heartbeats over 5-10s)
Coordinator updates topology: W2's downstream = W4
Coordinator notifies W2 and W4 of reconfiguration
W4 identifies missing chunks (compare received vs manifest)
W4 requests missing chunks from W2
Once caught up, pipeline resumes normal operation
Shorter heartbeat windows are important when your end-to-end target is ~8 minutes. Pair heartbeats with per-chunk timeouts so the coordinator can reassign slow or missing chunks quickly instead of waiting for full node failure detection.
def handle_worker_failure(failed_worker_id):
topology = get_topology()
idx = topology.index(failed_worker_id)
upstream = topology[idx - 1] if idx > 0 else None
downstream = topology[idx + 1] if idx < len(topology) - 1 else None
# Reconfigure pipeline to skip failed worker
topology.remove(failed_worker_id)
if upstream and downstream:
notify_worker(upstream, new_downstream=downstream)
notify_worker(downstream, new_upstream=upstream)
# Downstream catches up on missing chunks
missing = get_missing_chunks(downstream)
request_chunks(downstream, from_worker=upstream, chunks=missing)
Coordinator Failure
Solution: Run 3-5 coordinator replicas with leader election via etcd.
Active leader persists all state to etcd
Standby coordinators watch for leader failure
On failure, new leader elected within 5-10 seconds
New leader reads state from etcd and resumes operations
Workers reconnect to new leader endpoint
Corrupted Chunks
Detection: SHA256 checksum verification on every chunk.
def receive_chunk(chunk_id):
data = await upstream.receive(chunk_id)
expected = manifest.chunks[chunk_id].checksum
if sha256(data) != expected:
log_error(f"Checksum mismatch: chunk {chunk_id}")
# Retry with exponential backoff (max 3 attempts)
for attempt in range(3):
await sleep(2 ** attempt) # 1s, 2s, 4s
data = await upstream.receive(chunk_id)
if sha256(data) == expected:
break
else:
report_failure(chunk_id)
return
await write_to_disk(chunk_id, data)
Problem: Cross-rack transfers are slower and load core switches.
Solution: Order pipeline by rack to minimize cross-rack hops.
Naive ordering: W1(R1) → W4(R2) → W2(R1) → W5(R2) → W3(R1) → W6(R2)
(5 cross-rack hops)
Rack-aware: W1(R1) → W2(R1) → W3(R1) → W4(R2) → W5(R2) → W6(R2)
(1 cross-rack hop)
def assign_pipeline_topology(workers):
# Group workers by rack
racks = defaultdict(list)
for worker in workers:
racks[worker.rack].append(worker)
# Order: all workers in rack 1, then rack 2, etc.
topology = []
for rack in sorted(racks.keys()):
topology.extend(racks[rack])
return topology
Benefits: 80%+ reduction in cross-rack traffic, lower latency, reduced core switch load.
At extreme scale (10,000+ workers), the propagation time (N × chunk_time) dominates for a single pipeline. Consider a tree of pipelines: a shallow tree seeds multiple ~500-1,000 worker pipelines. This trades a ~2× seeding penalty per fanout of 2 for much shorter per-pipeline propagation.
Instead of a fixed pipeline/tree, a central scheduler assigns chunk transfers across a full-mesh data plane:
Coordinator tracks chunk inventory (which workers have which chunks) and link utilization.
Workers remain simple: request tasks, download assigned chunks from peers, and report progress.
Scheduler prioritizes rare chunks, balances bandwidth across racks, and reassigns stalled chunks quickly.
Why it can win in practice: better resilience to stragglers and node failures, easier recovery (just reassign chunks), and often lower tail latency at large N.
Trade-offs: more complex scheduler logic and state; more control-plane traffic; careful tuning needed to avoid hotspots.
Disk I/O requirement: Pipeline requires each worker to write at ~1.25 GB/s (10 Gbps). NVMe SSDs (3+ GB/s) handle this easily; spinning disks (~150 MB/s) would become the bottleneck.
Calculated time complexity for sequential, tree, and pipeline approaches
Identified external bandwidth (10 Gbps) as the initial bottleneck
Explained pipeline timing: download dominates up to ~500 workers, propagation dominates beyond
Justified chunk size selection (1 GB balances overhead vs responsiveness)
Described failure handling (worker failure, coordinator failure, corruption)
Discussed rack-aware topology optimization
Addressed scaling from 100 to 1000+ workers
Mentioned data integrity verification (SHA256 checksums)
Called out straggler mitigation (chunk timeouts, rare-chunk prioritization, reassignment)