Everything you need to understand, build, and debug distributed systems -- from CAP theorem and consensus algorithms to distributed caching and observability. With real code in Go and Node.js, ASCII diagrams, and battle-tested patterns used at companies running thousands of nodes.
A distributed system is a collection of independent computers that appears to its users as a single coherent system. You need them when a single machine cannot handle the load, when you need fault tolerance, or when your users are spread across the globe and need low latency.
Formulated by Eric Brewer in 2000 and proven by Gilbert and Lynch in 2002. In any distributed data store, you can only guarantee two of the following three properties simultaneously:
In practice, network partitions will happen. So the real choice is between CP (consistency + partition tolerance) and AP (availability + partition tolerance).
ASCII
Consistency (C)
/\
/ \
/ \
/ CP \
/ zone \
/----------\
/ CAN'T \
/ HAVE ALL \
/ THREE \
/___________________\
Availability (A) ---------- Partition Tolerance (P)
AP zone
CP systems: ZooKeeper, HBase, MongoDB (default), etcd, Consul
AP systems: Cassandra, DynamoDB, CouchDB, Riak
CA systems: Traditional RDBMS (single node -- no partition tolerance)
CAP is not a binary toggle. Real systems make trade-offs on a spectrum. Many databases let you tune consistency per-query. Cassandra, for example, lets you set consistency level to QUORUM for strong reads and ONE for fast reads. The PACELC theorem extends CAP: even when there is no Partition, you still trade off between Latency and Consistency.
Peter Deutsch (and James Gosling) identified assumptions that developers new to distributed systems mistakenly make:
1. The network is reliable -- Packets get dropped, connections reset, cables get cut.
2. Latency is zero -- A cross-continent round trip is ~150ms. That adds up fast.
3. Bandwidth is infinite -- Serialization, large payloads, and chatty protocols kill throughput.
4. The network is secure -- Every hop is an attack surface. mTLS exists for a reason.
5. Topology doesn't change -- Nodes join, leave, and move. Auto-scaling is normal.
6. There is one administrator -- Multiple teams, orgs, and cloud providers manage pieces.
7. Transport cost is zero -- Serialization/deserialization, DNS lookups, TLS handshakes all cost time and CPU.
8. The network is homogeneous -- Different hardware, OS versions, protocol versions everywhere.
Every design decision you make in a distributed system should account for these realities. If your code assumes any of these, it will break in production.
In a distributed system, there is no global clock. Each node has its own clock that drifts independently. We need logical clocks to establish ordering of events.
Defined by Leslie Lamport in 1978. Event A happens-before event B (written A → B) if:
Each process maintains a counter. On local event: increment. On send: increment and attach. On receive: set to max(local, received) + 1. If A → B then L(A) < L(B), but the converse is NOT true.
Go
package main
import (
"fmt"
"sync"
)
type LamportClock struct {
mu sync.Mutex
counter uint64
}
func NewLamportClock() *LamportClock {
return &LamportClock{counter: 0}
}
// Tick increments the clock for a local event
func (lc *LamportClock) Tick() uint64 {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.counter++
return lc.counter
}
// Send increments and returns timestamp to attach to message
func (lc *LamportClock) Send() uint64 {
return lc.Tick()
}
// Receive updates clock based on incoming message timestamp
func (lc *LamportClock) Receive(msgTimestamp uint64) uint64 {
lc.mu.Lock()
defer lc.mu.Unlock()
if msgTimestamp > lc.counter {
lc.counter = msgTimestamp
}
lc.counter++
return lc.counter
}
func main() {
nodeA := NewLamportClock()
nodeB := NewLamportClock()
t1 := nodeA.Tick() // A: local event, t=1
sendT := nodeA.Send() // A: send msg, t=2
recvT := nodeB.Receive(sendT) // B: receive, t=max(0,2)+1=3
fmt.Printf("A local: %d, A send: %d, B recv: %d\n", t1, sendT, recvT)
}
Lamport timestamps cannot detect concurrency. Vector clocks solve this: each node maintains a vector of counters, one per node. Now V(A) < V(B) implies A → B, and incomparable vectors mean concurrent events.
Go
type VectorClock struct {
nodeID string
clock map[string]uint64
}
func NewVectorClock(nodeID string) *VectorClock {
return &VectorClock{
nodeID: nodeID,
clock: map[string]uint64{nodeID: 0},
}
}
func (vc *VectorClock) Tick() {
vc.clock[vc.nodeID]++
}
func (vc *VectorClock) Send() map[string]uint64 {
vc.Tick()
// Return a copy
cp := make(map[string]uint64)
for k, v := range vc.clock {
cp[k] = v
}
return cp
}
func (vc *VectorClock) Receive(other map[string]uint64) {
for k, v := range other {
if v > vc.clock[k] {
vc.clock[k] = v
}
}
vc.clock[vc.nodeID]++
}
// HappensBefore returns true if vc happened before other
func (vc *VectorClock) HappensBefore(other *VectorClock) bool {
atLeastOneLess := false
for k, v := range vc.clock {
ov := other.clock[k]
if v > ov {
return false
}
if v < ov {
atLeastOneLess = true
}
}
for k, ov := range other.clock {
if _, exists := vc.clock[k]; !exists && ov > 0 {
atLeastOneLess = true
}
}
return atLeastOneLess
}
Use Lamport timestamps when you only need a total ordering (e.g., log sequencing). Use vector clocks when you need to detect concurrent writes (e.g., conflict detection in Dynamo-style databases). Vector clocks grow with the number of nodes -- in large clusters, consider dotted version vectors or interval tree clocks instead.
Consistency models define what guarantees a distributed system gives about the order and visibility of operations. From strongest to weakest:
ASCII
Strongest Weakest
| |
v v
Linearizable > Sequential > Causal > PRAM > Eventual
More consistency = more coordination = higher latency
Less consistency = less coordination = higher throughput
The gold standard. Every operation appears to take effect atomically at some point between its invocation and response. Acts as if there is a single copy of the data.
All processes see the same order of operations, but that order does not need to match real-time. Operations from a single process appear in program order. Weaker than linearizability because cross-process real-time ordering is not guaranteed.
If operation A causally precedes B (A → B), then all nodes see A before B. Concurrent operations can appear in any order. This captures the intuition of "if you saw it, your response must come after it."
If no new writes are made, eventually all replicas converge to the same value. No ordering guarantees during updates. Used by DNS, Cassandra (with low consistency level), and most caching systems.
Banking (account balance): Linearizable -- you cannot show a wrong balance.
Social media (like count): Eventual -- off by a few for a second is acceptable.
Chat messages (ordering): Causal -- replies must appear after the message they reply to.
Shopping cart: Eventual with conflict resolution -- Amazon's Dynamo paper pioneered this.
Node.js
// Demonstrating consistency issues with a naive distributed counter
// Two nodes both read, increment, write -- lost update problem
class DistributedCounter {
constructor(replicas) {
this.replicas = replicas; // Map of nodeId -> value
}
// Eventual consistency: read from any replica
readEventual(nodeId) {
return this.replicas.get(nodeId) || 0;
}
// Strong consistency: read from quorum (majority)
readStrong() {
const values = [...this.replicas.values()];
const majority = Math.floor(values.length / 2) + 1;
// In real systems, use read quorum + write quorum > N
const counts = new Map();
for (const v of values) {
counts.set(v, (counts.get(v) || 0) + 1);
if (counts.get(v) >= majority) return v;
}
return values[0]; // fallback
}
}
// Quorum formula: R + W > N guarantees overlap
// N=3 replicas, W=2 (write quorum), R=2 (read quorum)
// 2 + 2 = 4 > 3 -- at least one node has the latest write
Consensus is the problem of getting multiple nodes to agree on a value. It is the foundation of replicated state machines, leader election, and distributed locks.
Designed by Diego Ongaro and John Ousterhout in 2014 explicitly for understandability. Used in etcd, Consul, CockroachDB, and TiKV.
ASCII
Raft Node States:
+----------+ timeout +-----------+ wins election +--------+
| Follower | ---------------> | Candidate | -----------------> | Leader |
+----------+ +-----------+ +--------+
^ | |
| | discovers higher term |
| v |
+-------- discovers higher term / loses election <-----------+
Term: monotonically increasing logical clock. Each term has at most one leader.
Followers have a randomized election timeout (e.g., 150-300ms). If no heartbeat from leader within that timeout, the follower becomes a candidate, increments its term, votes for itself, and requests votes from all other nodes. A candidate wins if it gets votes from a majority.
Go
type NodeState int
const (
Follower NodeState = iota
Candidate
Leader
)
type RaftNode struct {
mu sync.Mutex
id int
state NodeState
currentTerm int
votedFor int // -1 means no vote this term
log []LogEntry
commitIndex int
peers []int
}
type LogEntry struct {
Term int
Command interface{}
}
type RequestVoteArgs struct {
Term int
CandidateID int
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
Term int
VoteGranted bool
}
func (rn *RaftNode) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rn.mu.Lock()
defer rn.mu.Unlock()
reply.Term = rn.currentTerm
reply.VoteGranted = false
// Rule 1: reject if candidate's term is old
if args.Term < rn.currentTerm {
return
}
// Step down if we see a higher term
if args.Term > rn.currentTerm {
rn.currentTerm = args.Term
rn.state = Follower
rn.votedFor = -1
}
// Rule 2: only vote if we haven't voted or already voted for this candidate
if rn.votedFor == -1 || rn.votedFor == args.CandidateID {
// Rule 3: candidate's log must be at least as up-to-date
lastIdx := len(rn.log) - 1
lastTerm := 0
if lastIdx >= 0 {
lastTerm = rn.log[lastIdx].Term
}
if args.LastLogTerm > lastTerm ||
(args.LastLogTerm == lastTerm && args.LastLogIndex >= lastIdx) {
reply.VoteGranted = true
rn.votedFor = args.CandidateID
}
}
}
Once elected, the leader accepts client requests, appends them to its log, and replicates them to followers via AppendEntries RPCs. An entry is committed once a majority of nodes have it. Committed entries are applied to the state machine.
ASCII
Log Replication Flow:
Client --> Leader: "SET x=5"
|
v
Leader appends to local log: [term=3, SET x=5]
|
+-------+-------+
| | |
v v v
Follower1 Follower2 Follower3 (AppendEntries RPC)
| | |
v v v
ACK ACK ACK
| | |
+-------+-------+
|
v
Leader: 3/4 nodes have entry (majority!) --> COMMIT
Leader applies to state machine and responds to client
Next heartbeat tells followers to commit too
Paxos, by Leslie Lamport, is the original consensus algorithm (1989). It is notoriously hard to understand. The basic idea has three roles: Proposers, Acceptors, and Learners.
Phase 1 (Prepare): Proposer picks a proposal number N. Sends Prepare(N) to a majority of acceptors. Acceptors promise not to accept proposals with number < N. If they already accepted a value, they return it.
Phase 2 (Accept): If proposer gets promises from majority, it sends Accept(N, value) where value is either the highest-numbered previously accepted value, or the proposer's own value. Acceptors accept if they haven't promised a higher number.
Result: Once a majority of acceptors accept a value, consensus is reached. Learners are notified.
Use Raft. It was designed to be understandable and produces equivalent results. Paxos is a family of protocols (Basic, Multi, Fast, Cheap, etc.) and implementing it correctly is extremely difficult. Every production Paxos implementation deviates from the paper significantly. Raft gives you a clear spec to implement against.
Replication keeps copies of data on multiple nodes for fault tolerance and read throughput. The fundamental challenge: keeping replicas consistent when writes happen.
One node is the leader (master). All writes go to the leader, which streams changes to followers (slaves/replicas). Reads can come from any replica.
ASCII
Writes ----> [Leader] ----> replication log ----+----+----+
| | | |
v v v v
[Follower 1] [F2] [F3] [F4]
^
|
Reads <---------+ (reads from any replica)
+ Simple, no write conflicts
- Single point of failure for writes
- Replication lag = stale reads from followers
Multiple nodes accept writes. Used in multi-datacenter setups. Each datacenter has a local leader. The big problem: write conflicts when two leaders accept conflicting writes.
Any replica can accept writes. Used by Dynamo, Cassandra, Riak. Clients write to multiple replicas and read from multiple replicas. Uses quorum reads/writes to ensure consistency.
Node.js
// Last-Writer-Wins (LWW) -- simple but loses data
function resolveConflictLWW(versions) {
return versions.reduce((latest, v) =>
v.timestamp > latest.timestamp ? v : latest
);
}
// Merge function -- application-specific, preserves data
function resolveShoppingCart(v1, v2) {
// Union of items from both carts
const merged = new Map();
for (const item of [...v1.items, ...v2.items]) {
const existing = merged.get(item.id);
if (!existing || item.quantity > existing.quantity) {
merged.set(item.id, item);
}
}
return { items: [...merged.values()] };
}
Data structures that can be replicated across nodes and updated independently without coordination. They mathematically guarantee convergence. Two types: state-based (CvRDT) and operation-based (CmRDT).
Go
// G-Counter: a grow-only counter CRDT
// Each node increments its own slot. Total = sum of all slots.
type GCounter struct {
counts map[string]uint64 // nodeID -> count
nodeID string
}
func NewGCounter(nodeID string) *GCounter {
return &GCounter{
counts: map[string]uint64{nodeID: 0},
nodeID: nodeID,
}
}
func (gc *GCounter) Increment() {
gc.counts[gc.nodeID]++
}
func (gc *GCounter) Value() uint64 {
var total uint64
for _, v := range gc.counts {
total += v
}
return total
}
// Merge takes the max of each node's counter
func (gc *GCounter) Merge(other *GCounter) {
for k, v := range other.counts {
if v > gc.counts[k] {
gc.counts[k] = v
}
}
}
// PN-Counter: supports both increment and decrement
// Uses two G-Counters: one for increments, one for decrements
type PNCounter struct {
pos *GCounter
neg *GCounter
}
func (pn *PNCounter) Increment() { pn.pos.Increment() }
func (pn *PNCounter) Decrement() { pn.neg.Increment() }
func (pn *PNCounter) Value() int64 {
return int64(pn.pos.Value()) - int64(pn.neg.Value())
}
Redis has built-in CRDT support in Redis Enterprise. Riak uses CRDTs for maps, sets, counters, and flags. Automerge and Yjs are CRDT libraries for collaborative editing (Google Docs-style). CRDTs trade off space (metadata grows) for coordination-freedom.
When data is too large for one node, you split it across multiple nodes. Each piece is a partition (or shard). The key question: which data goes to which node?
Assign contiguous ranges of keys to each partition. Good for range queries but risks hot spots (e.g., all recent timestamps hit the same partition).
Hash the key and assign based on hash value. Distributes data evenly but range queries require scatter-gather.
Go
import (
"crypto/sha256"
"encoding/binary"
"fmt"
)
func hashPartition(key string, numPartitions int) int {
h := sha256.Sum256([]byte(key))
num := binary.BigEndian.Uint64(h[:8])
return int(num % uint64(numPartitions))
}
func main() {
keys := []string{"user:1001", "user:1002", "user:1003", "order:5001"}
for _, k := range keys {
fmt.Printf("Key %s -> Partition %d\n", k, hashPartition(k, 4))
}
}
The problem with simple hash partitioning: when you add or remove a node, almost ALL keys get reassigned. Consistent hashing fixes this -- only K/N keys move on average (K = total keys, N = number of nodes).
ASCII
Consistent Hashing Ring:
0 / 2^32
|
Node C | Node A
\ | /
\ | /
*----+----*
/ \
/ \
------ ------
| | | |
| | | |
------ ------
\ /
\ /
*---------*
/ | \
/ | \
Node B | Node D
|
2^31
Each key is hashed onto the ring.
Walk clockwise to find the first node -- that's the owner.
Virtual nodes: each physical node gets multiple positions on the ring
for better balance.
Go
import (
"crypto/sha256"
"encoding/binary"
"fmt"
"sort"
)
type ConsistentHash struct {
ring []uint64
nodeMap map[uint64]string
vnodes int // virtual nodes per physical node
}
func NewConsistentHash(vnodes int) *ConsistentHash {
return &ConsistentHash{
nodeMap: make(map[uint64]string),
vnodes: vnodes,
}
}
func (ch *ConsistentHash) hashKey(key string) uint64 {
h := sha256.Sum256([]byte(key))
return binary.BigEndian.Uint64(h[:8])
}
func (ch *ConsistentHash) AddNode(node string) {
for i := 0; i < ch.vnodes; i++ {
vkey := fmt.Sprintf("%s-vnode-%d", node, i)
hash := ch.hashKey(vkey)
ch.ring = append(ch.ring, hash)
ch.nodeMap[hash] = node
}
sort.Slice(ch.ring, func(i, j int) bool {
return ch.ring[i] < ch.ring[j]
})
}
func (ch *ConsistentHash) GetNode(key string) string {
hash := ch.hashKey(key)
idx := sort.Search(len(ch.ring), func(i int) bool {
return ch.ring[i] >= hash
})
if idx == len(ch.ring) {
idx = 0 // wrap around the ring
}
return ch.nodeMap[ch.ring[idx]]
}
Even with consistent hashing, moving data between nodes takes time and bandwidth. Plan for it: use partition counts larger than your node count so you can move whole partitions. Kafka uses this approach -- partitions are the unit of rebalancing, not individual keys.
How do you ensure atomicity when a transaction spans multiple services or partitions? Single-node ACID does not help you here.
ASCII
Phase 1: PREPARE
Coordinator ----Prepare----> Participant A --> "Yes, I can commit"
----Prepare----> Participant B --> "Yes, I can commit"
----Prepare----> Participant C --> "Yes, I can commit"
Phase 2: COMMIT (if all said yes)
Coordinator ----Commit-----> Participant A --> ACK
----Commit-----> Participant B --> ACK
----Commit-----> Participant C --> ACK
If ANY participant says "No" in Phase 1:
Coordinator ----Abort------> All Participants
Problem: If coordinator crashes after Phase 1, participants are STUCK
waiting (blocking protocol). This is why 2PC is rarely used across services.
Instead of a distributed transaction, break it into a sequence of local transactions. Each step has a compensating action that undoes it if a later step fails.
Node.js
// Saga: Order Processing
// Step 1: Reserve inventory
// Step 2: Charge payment
// Step 3: Ship order
// If step 3 fails, compensate: refund payment, release inventory
class SagaOrchestrator {
constructor() {
this.steps = [];
this.completedSteps = [];
}
addStep(name, execute, compensate) {
this.steps.push({ name, execute, compensate });
}
async run(context) {
for (const step of this.steps) {
try {
console.log(`Executing: ${step.name}`);
const result = await step.execute(context);
context[step.name] = result;
this.completedSteps.push(step);
} catch (err) {
console.log(`Failed: ${step.name} -- ${err.message}`);
await this.compensate(context);
throw err;
}
}
return context;
}
async compensate(context) {
// Compensate in reverse order
for (const step of this.completedSteps.reverse()) {
try {
console.log(`Compensating: ${step.name}`);
await step.compensate(context);
} catch (err) {
console.error(`Compensation failed for ${step.name}: ${err.message}`);
// Log for manual intervention -- compensations MUST eventually succeed
}
}
}
}
// Usage
const saga = new SagaOrchestrator();
saga.addStep(
'reserveInventory',
async (ctx) => { /* call inventory service */ return { reservationId: 'abc' }; },
async (ctx) => { /* release reservation ctx.reserveInventory.reservationId */ }
);
saga.addStep(
'chargePayment',
async (ctx) => { /* call payment service */ return { chargeId: 'ch_123' }; },
async (ctx) => { /* refund ctx.chargePayment.chargeId */ }
);
saga.addStep(
'shipOrder',
async (ctx) => { /* call shipping service */ return { trackingId: 'trk_789' }; },
async (ctx) => { /* cancel shipment */ }
);
When you need to update a database AND publish an event atomically. Write both to the same database in one transaction. A separate process reads the outbox table and publishes events.
Go
// Transactional Outbox: write order + event in one DB transaction
func CreateOrderWithOutbox(db *sql.DB, order Order) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 1. Insert the order
_, err = tx.Exec(
"INSERT INTO orders (id, user_id, total) VALUES ($1, $2, $3)",
order.ID, order.UserID, order.Total,
)
if err != nil {
return err
}
// 2. Insert event into outbox table (same transaction!)
eventPayload, _ := json.Marshal(order)
_, err = tx.Exec(
"INSERT INTO outbox (id, event_type, payload, created_at) VALUES ($1, $2, $3, NOW())",
uuid.New(), "order.created", eventPayload,
)
if err != nil {
return err
}
return tx.Commit()
}
// Separate poller reads outbox and publishes to message broker
func OutboxPoller(db *sql.DB, publisher MessagePublisher) {
for {
rows, _ := db.Query(
"SELECT id, event_type, payload FROM outbox WHERE published = false ORDER BY created_at LIMIT 100",
)
for rows.Next() {
var id, eventType, payload string
rows.Scan(&id, &eventType, &payload)
publisher.Publish(eventType, payload)
db.Exec("UPDATE outbox SET published = true WHERE id = $1", id)
}
time.Sleep(500 * time.Millisecond)
}
}
Instead of polling the outbox table, use Change Data Capture (CDC) with tools like Debezium. It reads the database's write-ahead log (WAL) and streams changes to Kafka. No polling delay, no extra load on the database. This is the production-grade approach.
How do nodes in a large cluster discover each other, detect failures, and propagate state? Gossip protocols spread information like a rumor -- each node periodically tells a random peer what it knows.
ASCII
Gossip Round 1: Round 2: Round 3:
Node A knows X A tells B B tells D
A tells C C tells E
[A*] [B] [C] [A*] [B*] [C*] [A*] [B*] [C*]
[D] [E] [F] [D] [E] [F] [D*] [E*] [F]
Information spreads in O(log N) rounds to all N nodes.
Even if some messages are lost, redundancy ensures delivery.
Nodes periodically ping random peers. If a node does not respond, don't immediately declare it dead. Use a suspicion mechanism: mark it as suspected, gossip the suspicion, and only declare dead after a timeout.
Go
type NodeStatus int
const (
Alive NodeStatus = iota
Suspected
Dead
)
type Member struct {
ID string
Addr string
Status NodeStatus
Heartbeat uint64
Timestamp time.Time
}
type GossipNode struct {
mu sync.RWMutex
self Member
members map[string]*Member
suspectTTL time.Duration
}
func (g *GossipNode) GossipRound() {
g.mu.RLock()
peers := make([]string, 0, len(g.members))
for id := range g.members {
if id != g.self.ID {
peers = append(peers, id)
}
}
g.mu.RUnlock()
if len(peers) == 0 {
return
}
// Pick a random peer (fanout = 1 for simplicity)
target := peers[rand.Intn(len(peers))]
// Send our membership list, receive theirs, merge
g.sendGossip(target)
}
func (g *GossipNode) MergeMemberList(incoming []Member) {
g.mu.Lock()
defer g.mu.Unlock()
for _, m := range incoming {
existing, ok := g.members[m.ID]
if !ok {
// New node discovered
g.members[m.ID] = &m
continue
}
// Higher heartbeat = more recent info
if m.Heartbeat > existing.Heartbeat {
existing.Heartbeat = m.Heartbeat
existing.Status = m.Status
existing.Timestamp = time.Now()
}
}
}
func (g *GossipNode) DetectFailures() {
g.mu.Lock()
defer g.mu.Unlock()
for _, m := range g.members {
if m.ID == g.self.ID {
continue
}
age := time.Since(m.Timestamp)
if m.Status == Alive && age > g.suspectTTL {
m.Status = Suspected
} else if m.Status == Suspected && age > g.suspectTTL*3 {
m.Status = Dead
}
}
}
Cassandra uses gossip for cluster membership and failure detection.
HashiCorp Serf/Consul uses the SWIM protocol (gossip-based failure detection).
Redis Cluster uses gossip for node discovery and slot assignment propagation.
Distribute incoming traffic across multiple backend servers to maximize throughput, minimize latency, and avoid overloading any single server.
ASCII
Algorithm Comparison:
Round Robin Least Connections Weighted Round Robin
A -> B -> C -> A(2) B(5) C(1) A(w=3) B(w=1) C(w=2)
A -> B -> C -> next -> C (least) A A A B C C A A A ...
Simple, fair if Good for varying Good when servers
requests are request durations have different capacity
similar cost
Node.js
// Simple round-robin load balancer in Node.js
const http = require('http');
const httpProxy = require('http-proxy');
const backends = [
{ host: '127.0.0.1', port: 3001 },
{ host: '127.0.0.1', port: 3002 },
{ host: '127.0.0.1', port: 3003 },
];
let current = 0;
const proxy = httpProxy.createProxyServer({});
const server = http.createServer((req, res) => {
const target = backends[current % backends.length];
current++;
proxy.web(req, res, {
target: `http://${target.host}:${target.port}`
});
});
proxy.on('error', (err, req, res) => {
res.writeHead(502);
res.end('Bad Gateway');
});
server.listen(8080, () => {
console.log('Load balancer on :8080');
});
Go
// Least-connections load balancer
type Backend struct {
URL *url.URL
Alive bool
Connections int64
mu sync.RWMutex
}
type LeastConnLB struct {
backends []*Backend
mu sync.RWMutex
}
func (lb *LeastConnLB) NextBackend() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
var best *Backend
for _, b := range lb.backends {
if !b.Alive {
continue
}
if best == nil || atomic.LoadInt64(&b.Connections) < atomic.LoadInt64(&best.Connections) {
best = b
}
}
if best != nil {
atomic.AddInt64(&best.Connections, 1)
}
return best
}
Caching reduces load on databases and speeds up responses. In a distributed system, caching introduces its own set of consistency challenges.
The application checks the cache first. On miss, it loads from the database, stores in cache, and returns. On write, it updates the database and invalidates the cache.
Node.js
const Redis = require('ioredis');
const redis = new Redis();
async function getUserCacheAside(userId) {
const cacheKey = `user:${userId}`;
// 1. Check cache
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached); // Cache hit
}
// 2. Cache miss -- load from DB
const user = await db.query('SELECT * FROM users WHERE id = $1', [userId]);
// 3. Store in cache with TTL
await redis.setex(cacheKey, 3600, JSON.stringify(user));
return user;
}
async function updateUser(userId, data) {
// 1. Update database FIRST
await db.query('UPDATE users SET name=$1 WHERE id=$2', [data.name, userId]);
// 2. Invalidate cache (don't update -- avoids race conditions)
await redis.del(`user:${userId}`);
}
Every write goes to both cache and database synchronously. Guarantees cache is always up-to-date but adds latency to writes.
When a popular cache key expires, hundreds of concurrent requests all miss the cache and hit the database simultaneously. Solutions:
Go
import (
"context"
"sync"
"time"
)
// singleflight: only one goroutine fetches, others wait for the result
type SingleFlight struct {
mu sync.Mutex
calls map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
func (sf *SingleFlight) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
sf.mu.Lock()
if sf.calls == nil {
sf.calls = make(map[string]*call)
}
if c, ok := sf.calls[key]; ok {
sf.mu.Unlock()
c.wg.Wait() // Wait for the in-flight request
return c.val, c.err
}
c := &call{}
c.wg.Add(1)
sf.calls[key] = c
sf.mu.Unlock()
// Only ONE goroutine executes this
c.val, c.err = fn()
c.wg.Done()
sf.mu.Lock()
delete(sf.calls, key)
sf.mu.Unlock()
return c.val, c.err
}
// Usage: prevents cache stampede
// var sf SingleFlight
// val, err := sf.Do("user:123", func() (interface{}, error) {
// return db.GetUser(123) // only called once even with 1000 concurrent requests
// })
"There are only two hard things in Computer Science: cache invalidation and naming things." Delete on write is the safest strategy. Never update the cache on write -- if two writes happen concurrently, the cache can end up with a stale value permanently. Use TTLs as a safety net. Consider event-driven invalidation with CDC for strong consistency.
Protect your services from being overwhelmed. Rate limiting controls how many requests a client can make in a given time window.
A bucket holds tokens. Each request consumes one token. Tokens are added at a fixed rate. If the bucket is empty, the request is rejected. Allows bursts up to the bucket capacity.
Go
type TokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64 // tokens per second
lastRefill time.Time
}
func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
return &TokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
// Refill tokens based on elapsed time
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
tb.tokens += elapsed * tb.refillRate
if tb.tokens > tb.maxTokens {
tb.tokens = tb.maxTokens
}
tb.lastRefill = now
// Try to consume a token
if tb.tokens >= 1.0 {
tb.tokens -= 1.0
return true
}
return false
}
For distributed rate limiting across multiple servers, use Redis sorted sets. Each request adds a timestamp, and you count entries in the window.
Node.js
const Redis = require('ioredis');
const redis = new Redis();
async function slidingWindowRateLimit(userId, maxRequests, windowSec) {
const key = `ratelimit:${userId}`;
const now = Date.now();
const windowStart = now - windowSec * 1000;
// Use a Redis transaction (pipeline)
const pipe = redis.pipeline();
// Remove entries outside the window
pipe.zremrangebyscore(key, 0, windowStart);
// Count entries in the window
pipe.zcard(key);
// Add current request
pipe.zadd(key, now, `${now}-${Math.random()}`);
// Set TTL so the key auto-expires
pipe.expire(key, windowSec);
const results = await pipe.exec();
const count = results[1][1]; // result of zcard
if (count >= maxRequests) {
return { allowed: false, remaining: 0 };
}
return { allowed: true, remaining: maxRequests - count - 1 };
}
// Usage: 100 requests per 60 seconds per user
// const result = await slidingWindowRateLimit('user:42', 100, 60);
Use token bucket for API rate limiting (allows bursts). Use sliding window for strict per-second limits. For distributed rate limiting, Redis is the standard choice. Consider using a Lua script in Redis to make the check-and-increment atomic. Libraries: Go has golang.org/x/time/rate, Node.js has rate-limiter-flexible.
An operation is idempotent if performing it multiple times has the same effect as performing it once. In distributed systems, retries are inevitable -- your APIs MUST be idempotent.
Go
// Idempotency middleware using Redis
func IdempotencyMiddleware(redisClient *redis.Client) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
idempotencyKey := r.Header.Get("Idempotency-Key")
if idempotencyKey == "" {
next.ServeHTTP(w, r)
return
}
cacheKey := "idempotency:" + idempotencyKey
// Check if we already processed this request
cached, err := redisClient.Get(r.Context(), cacheKey).Result()
if err == nil {
// Already processed -- return cached response
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(cached))
return
}
// Try to acquire a lock (prevent concurrent processing of same key)
lockKey := cacheKey + ":lock"
ok, _ := redisClient.SetNX(r.Context(), lockKey, "1", 30*time.Second).Result()
if !ok {
http.Error(w, "Request already in progress", 409)
return
}
defer redisClient.Del(r.Context(), lockKey)
// Capture the response
rec := &responseRecorder{ResponseWriter: w, body: &bytes.Buffer{}}
next.ServeHTTP(rec, r)
// Cache the response for 24 hours
redisClient.Set(r.Context(), cacheKey, rec.body.String(), 24*time.Hour)
})
}
}
Node.js
// Idempotent payment processing
async function processPayment(idempotencyKey, amount, userId) {
// 1. Check if this payment was already processed
const existing = await db.query(
'SELECT result FROM idempotency_keys WHERE key = $1',
[idempotencyKey]
);
if (existing.rows.length > 0) {
return JSON.parse(existing.rows[0].result); // Return cached result
}
// 2. Process the payment
const charge = await stripe.charges.create({
amount,
currency: 'usd',
customer: userId,
idempotency_key: idempotencyKey, // Stripe supports this natively!
});
// 3. Store result with the idempotency key
await db.query(
'INSERT INTO idempotency_keys (key, result, created_at) VALUES ($1, $2, NOW())',
[idempotencyKey, JSON.stringify(charge)]
);
return charge;
}
True exactly-once delivery is impossible in a distributed system (proven by the Two Generals Problem). What we actually achieve is effectively-once: at-least-once delivery + idempotent processing = appears exactly-once. Kafka achieves "exactly-once semantics" through idempotent producers + transactional consumers -- it is still at-least-once under the hood.
You cannot fix what you cannot see. In a distributed system, a single user request may touch 10+ services. You need distributed tracing, metrics, and structured logging to understand what happened.
ASCII
A single user request as a trace:
Trace ID: abc-123
|
+-- Span: API Gateway (12ms)
| |
| +-- Span: Auth Service (3ms)
| |
| +-- Span: Order Service (45ms)
| |
| +-- Span: DB Query (8ms)
| |
| +-- Span: Payment Service (30ms)
| |
| +-- Span: Stripe API call (25ms)
|
Total: 60ms
Each span has: trace_id, span_id, parent_span_id, start_time,
duration, service_name, attributes, status
Go
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.Tracer("order-service")
func CreateOrder(ctx context.Context, req OrderRequest) (*Order, error) {
ctx, span := tracer.Start(ctx, "CreateOrder",
trace.WithAttributes(
attribute.String("user.id", req.UserID),
attribute.Float64("order.total", req.Total),
),
)
defer span.End()
// This span is a child of the parent span from the API gateway
// The context carries the trace ID and parent span ID
// Call payment service -- the context propagates the trace
charge, err := paymentClient.Charge(ctx, req.Total)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.AddEvent("payment_processed",
trace.WithAttributes(attribute.String("charge.id", charge.ID)),
)
return &Order{ID: generateID(), ChargeID: charge.ID}, nil
}
When a downstream service is failing, stop sending it traffic. Let it recover. The circuit breaker pattern has three states: Closed (normal), Open (failing, reject requests), Half-Open (test if service recovered).
ASCII
success
+---------------------+
v |
+--------+ +----------+
| CLOSED | -----> | HALF-OPEN |
+--------+ timer +----------+
| failures |
| exceed | failure
| threshold |
v v
+------+ +------+
| OPEN | <--------| OPEN |
+------+ +------+
(reject all (back to open)
requests)
Node.js
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.resetTimeout = options.resetTimeout || 30000; // 30s
this.state = 'CLOSED';
this.failures = 0;
this.lastFailure = null;
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailure > this.resetTimeout) {
this.state = 'HALF_OPEN';
} else {
throw new Error('Circuit breaker is OPEN');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (err) {
this.onFailure();
throw err;
}
}
onSuccess() {
this.failures = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failures++;
this.lastFailure = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
}
}
}
// Usage
const breaker = new CircuitBreaker({ failureThreshold: 3, resetTimeout: 10000 });
async function callPaymentService(data) {
return breaker.execute(async () => {
const res = await fetch('https://payment.internal/charge', {
method: 'POST',
body: JSON.stringify(data),
});
if (!res.ok) throw new Error(`Payment failed: ${res.status}`);
return res.json();
});
}
Traces: Follow a request through services (Jaeger, Zipkin, Datadog APM). Metrics: Aggregated numbers over time -- request rate, error rate, latency percentiles (Prometheus + Grafana). Logs: Structured event records with correlation IDs (ELK stack, Loki). OpenTelemetry unifies all three with a single SDK.
Distributed systems are best learned by reading papers, implementing protocols, and building things that break.
MIT 6.824 (now 6.5840) is the best distributed systems course in existence. The labs have you implement:
Lab 1: MapReduce -- distributed batch processing framework
Lab 2: Raft -- full consensus algorithm with leader election, log replication, persistence
Lab 3: Fault-tolerant key/value service -- built on top of your Raft implementation
Lab 4: Sharded key/value service -- with dynamic shard migration
All labs are in Go. The course is freely available at pdos.csail.mit.edu/6.824. Do the labs. They will take weeks and teach you more than any book.
Martin Kleppmann's DDIA is the single best book on distributed systems for practitioners. It covers replication, partitioning, transactions, consistency, batch/stream processing, and more. Read it cover to cover. Then read it again. Every senior engineer should have read this book.
Foundations:
- Lamport, "Time, Clocks, and the Ordering of Events" (1978) -- the paper that started it all
- Fischer, Lynch, Paterson, "Impossibility of Distributed Consensus" (FLP, 1985) -- proves async consensus is impossible with one faulty process
- Brewer, "CAP Twelve Years Later" (2012) -- Brewer's own clarification of CAP
Consensus:
- Ongaro & Ousterhout, "In Search of an Understandable Consensus Algorithm" (Raft, 2014)
- Lamport, "Paxos Made Simple" (2001) -- Lamport's attempt to simplify his own algorithm
Industry Systems:
- DeCandia et al., "Dynamo: Amazon's Highly Available Key-value Store" (2007) -- eventual consistency, vector clocks, consistent hashing
- Corbett et al., "Spanner: Google's Globally-Distributed Database" (2012) -- TrueTime, external consistency
- Kreps, "The Log: What every software engineer should know" (2013) -- the foundation of Kafka's design
- Shapiro et al., "Conflict-free Replicated Data Types" (2011) -- CRDTs formalized
1. Distributed KV Store: Build a key-value store with Raft consensus. Start with MIT 6.824 labs.
2. URL Shortener at Scale: Hash partitioning, read replicas, caching layer, rate limiting.
3. Chat System: WebSockets across multiple servers. Use Redis Pub/Sub for cross-server message delivery. Implement causal ordering.
4. Distributed Task Queue: Like a mini Celery/Sidekiq. At-least-once delivery, idempotent workers, dead letter queue.
5. Fly.io Gossip Glomers: A set of distributed systems challenges at fly.io/dist-sys. Implement broadcast, unique ID generation, counters, and Kafka-style logs. Uses Maelstrom for testing.
Distributed systems are genuinely difficult. Bugs are non-deterministic, latent, and often only appear under load or during failures. You cannot unit test your way to correctness -- you need property-based testing (Jepsen), chaos engineering (Chaos Monkey), and formal verification (TLA+). Start simple. Add distribution only when you must. A single PostgreSQL instance handles more load than most startups will ever need.