Table of Contents

1. Why Distributed Systems 2. Fundamentals -- Time & Ordering 3. Consistency Models 4. Consensus Algorithms 5. Replication 6. Partitioning & Sharding 7. Distributed Transactions 8. Gossip Protocols 9. Load Balancing 10. Distributed Caching 11. Rate Limiting 12. Idempotency 13. Observability 14. Practice & Resources

1. Why Distributed Systems

A distributed system is a collection of independent computers that appears to its users as a single coherent system. You need them when a single machine cannot handle the load, when you need fault tolerance, or when your users are spread across the globe and need low latency.

The CAP Theorem

Formulated by Eric Brewer in 2000 and proven by Gilbert and Lynch in 2002. In any distributed data store, you can only guarantee two of the following three properties simultaneously:

CAP Theorem
Consistency -- Every read receives the most recent write or an error
Availability -- Every request receives a non-error response (no guarantee it's the most recent write)
Partition Tolerance -- The system continues to operate despite arbitrary message loss between nodes

In practice, network partitions will happen. So the real choice is between CP (consistency + partition tolerance) and AP (availability + partition tolerance).

ASCII
                    Consistency (C)
                         /\
                        /  \
                       /    \
                      / CP   \
                     /  zone  \
                    /----------\
                   /   CAN'T    \
                  /   HAVE ALL   \
                 /     THREE      \
                /___________________\
    Availability (A) ---------- Partition Tolerance (P)
                   AP zone

    CP systems: ZooKeeper, HBase, MongoDB (default), etcd, Consul
    AP systems: Cassandra, DynamoDB, CouchDB, Riak
    CA systems: Traditional RDBMS (single node -- no partition tolerance)
CAP is Nuanced

CAP is not a binary toggle. Real systems make trade-offs on a spectrum. Many databases let you tune consistency per-query. Cassandra, for example, lets you set consistency level to QUORUM for strong reads and ONE for fast reads. The PACELC theorem extends CAP: even when there is no Partition, you still trade off between Latency and Consistency.

The 8 Fallacies of Distributed Computing

Peter Deutsch (and James Gosling) identified assumptions that developers new to distributed systems mistakenly make:

The 8 Fallacies

1. The network is reliable -- Packets get dropped, connections reset, cables get cut.

2. Latency is zero -- A cross-continent round trip is ~150ms. That adds up fast.

3. Bandwidth is infinite -- Serialization, large payloads, and chatty protocols kill throughput.

4. The network is secure -- Every hop is an attack surface. mTLS exists for a reason.

5. Topology doesn't change -- Nodes join, leave, and move. Auto-scaling is normal.

6. There is one administrator -- Multiple teams, orgs, and cloud providers manage pieces.

7. Transport cost is zero -- Serialization/deserialization, DNS lookups, TLS handshakes all cost time and CPU.

8. The network is homogeneous -- Different hardware, OS versions, protocol versions everywhere.

Every design decision you make in a distributed system should account for these realities. If your code assumes any of these, it will break in production.

2. Fundamentals -- Time & Ordering

In a distributed system, there is no global clock. Each node has its own clock that drifts independently. We need logical clocks to establish ordering of events.

Happens-Before Relation

Defined by Leslie Lamport in 1978. Event A happens-before event B (written A → B) if:

Happens-Before Rules
1. A and B are events in the same process, and A comes before B
2. A is a send event and B is the corresponding receive event
3. Transitivity: if A → B and B → C, then A → C

If neither A → B nor B → A, the events are concurrent (A || B)

Lamport Timestamps

Each process maintains a counter. On local event: increment. On send: increment and attach. On receive: set to max(local, received) + 1. If A → B then L(A) < L(B), but the converse is NOT true.

Go
package main

import (
    "fmt"
    "sync"
)

type LamportClock struct {
    mu      sync.Mutex
    counter uint64
}

func NewLamportClock() *LamportClock {
    return &LamportClock{counter: 0}
}

// Tick increments the clock for a local event
func (lc *LamportClock) Tick() uint64 {
    lc.mu.Lock()
    defer lc.mu.Unlock()
    lc.counter++
    return lc.counter
}

// Send increments and returns timestamp to attach to message
func (lc *LamportClock) Send() uint64 {
    return lc.Tick()
}

// Receive updates clock based on incoming message timestamp
func (lc *LamportClock) Receive(msgTimestamp uint64) uint64 {
    lc.mu.Lock()
    defer lc.mu.Unlock()
    if msgTimestamp > lc.counter {
        lc.counter = msgTimestamp
    }
    lc.counter++
    return lc.counter
}

func main() {
    nodeA := NewLamportClock()
    nodeB := NewLamportClock()

    t1 := nodeA.Tick()           // A: local event, t=1
    sendT := nodeA.Send()        // A: send msg, t=2
    recvT := nodeB.Receive(sendT) // B: receive, t=max(0,2)+1=3

    fmt.Printf("A local: %d, A send: %d, B recv: %d\n", t1, sendT, recvT)
}

Vector Clocks

Lamport timestamps cannot detect concurrency. Vector clocks solve this: each node maintains a vector of counters, one per node. Now V(A) < V(B) implies A → B, and incomparable vectors mean concurrent events.

Go
type VectorClock struct {
    nodeID string
    clock  map[string]uint64
}

func NewVectorClock(nodeID string) *VectorClock {
    return &VectorClock{
        nodeID: nodeID,
        clock:  map[string]uint64{nodeID: 0},
    }
}

func (vc *VectorClock) Tick() {
    vc.clock[vc.nodeID]++
}

func (vc *VectorClock) Send() map[string]uint64 {
    vc.Tick()
    // Return a copy
    cp := make(map[string]uint64)
    for k, v := range vc.clock {
        cp[k] = v
    }
    return cp
}

func (vc *VectorClock) Receive(other map[string]uint64) {
    for k, v := range other {
        if v > vc.clock[k] {
            vc.clock[k] = v
        }
    }
    vc.clock[vc.nodeID]++
}

// HappensBefore returns true if vc happened before other
func (vc *VectorClock) HappensBefore(other *VectorClock) bool {
    atLeastOneLess := false
    for k, v := range vc.clock {
        ov := other.clock[k]
        if v > ov {
            return false
        }
        if v < ov {
            atLeastOneLess = true
        }
    }
    for k, ov := range other.clock {
        if _, exists := vc.clock[k]; !exists && ov > 0 {
            atLeastOneLess = true
        }
    }
    return atLeastOneLess
}
When to Use Which

Use Lamport timestamps when you only need a total ordering (e.g., log sequencing). Use vector clocks when you need to detect concurrent writes (e.g., conflict detection in Dynamo-style databases). Vector clocks grow with the number of nodes -- in large clusters, consider dotted version vectors or interval tree clocks instead.

3. Consistency Models

Consistency models define what guarantees a distributed system gives about the order and visibility of operations. From strongest to weakest:

ASCII
  Strongest                                              Weakest
  |                                                           |
  v                                                           v
  Linearizable > Sequential > Causal > PRAM > Eventual

  More consistency = more coordination = higher latency
  Less consistency = less coordination = higher throughput

Linearizability (Strong Consistency)

The gold standard. Every operation appears to take effect atomically at some point between its invocation and response. Acts as if there is a single copy of the data.

Linearizability Guarantee
For any two operations op1 and op2:
If op1 completes before op2 starts → op1's effect is visible to op2
Real-time ordering is preserved.

Sequential Consistency

All processes see the same order of operations, but that order does not need to match real-time. Operations from a single process appear in program order. Weaker than linearizability because cross-process real-time ordering is not guaranteed.

Causal Consistency

If operation A causally precedes B (A → B), then all nodes see A before B. Concurrent operations can appear in any order. This captures the intuition of "if you saw it, your response must come after it."

Eventual Consistency

If no new writes are made, eventually all replicas converge to the same value. No ordering guarantees during updates. Used by DNS, Cassandra (with low consistency level), and most caching systems.

Real-World Consistency Choices

Banking (account balance): Linearizable -- you cannot show a wrong balance.

Social media (like count): Eventual -- off by a few for a second is acceptable.

Chat messages (ordering): Causal -- replies must appear after the message they reply to.

Shopping cart: Eventual with conflict resolution -- Amazon's Dynamo paper pioneered this.

Node.js
// Demonstrating consistency issues with a naive distributed counter
// Two nodes both read, increment, write -- lost update problem

class DistributedCounter {
  constructor(replicas) {
    this.replicas = replicas; // Map of nodeId -> value
  }

  // Eventual consistency: read from any replica
  readEventual(nodeId) {
    return this.replicas.get(nodeId) || 0;
  }

  // Strong consistency: read from quorum (majority)
  readStrong() {
    const values = [...this.replicas.values()];
    const majority = Math.floor(values.length / 2) + 1;
    // In real systems, use read quorum + write quorum > N
    const counts = new Map();
    for (const v of values) {
      counts.set(v, (counts.get(v) || 0) + 1);
      if (counts.get(v) >= majority) return v;
    }
    return values[0]; // fallback
  }
}

// Quorum formula: R + W > N guarantees overlap
// N=3 replicas, W=2 (write quorum), R=2 (read quorum)
// 2 + 2 = 4 > 3 -- at least one node has the latest write

4. Consensus Algorithms

Consensus is the problem of getting multiple nodes to agree on a value. It is the foundation of replicated state machines, leader election, and distributed locks.

Raft -- Understandable Consensus

Designed by Diego Ongaro and John Ousterhout in 2014 explicitly for understandability. Used in etcd, Consul, CockroachDB, and TiKV.

ASCII
  Raft Node States:

  +----------+     timeout      +-----------+    wins election    +--------+
  | Follower | ---------------> | Candidate | -----------------> | Leader |
  +----------+                  +-----------+                    +--------+
       ^                             |                               |
       |                             | discovers higher term          |
       |                             v                               |
       +-------- discovers higher term / loses election <-----------+

  Term: monotonically increasing logical clock. Each term has at most one leader.

Leader Election

Followers have a randomized election timeout (e.g., 150-300ms). If no heartbeat from leader within that timeout, the follower becomes a candidate, increments its term, votes for itself, and requests votes from all other nodes. A candidate wins if it gets votes from a majority.

Raft Election Rules
1. Each node votes for at most one candidate per term
2. Candidate must have a log at least as up-to-date as the voter
3. Majority (N/2 + 1) votes needed to win
4. If split vote, timeout and try again with new term
Go
type NodeState int

const (
    Follower  NodeState = iota
    Candidate
    Leader
)

type RaftNode struct {
    mu          sync.Mutex
    id          int
    state       NodeState
    currentTerm int
    votedFor    int  // -1 means no vote this term
    log         []LogEntry
    commitIndex int
    peers       []int
}

type LogEntry struct {
    Term    int
    Command interface{}
}

type RequestVoteArgs struct {
    Term         int
    CandidateID  int
    LastLogIndex int
    LastLogTerm  int
}

type RequestVoteReply struct {
    Term        int
    VoteGranted bool
}

func (rn *RaftNode) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rn.mu.Lock()
    defer rn.mu.Unlock()

    reply.Term = rn.currentTerm
    reply.VoteGranted = false

    // Rule 1: reject if candidate's term is old
    if args.Term < rn.currentTerm {
        return
    }

    // Step down if we see a higher term
    if args.Term > rn.currentTerm {
        rn.currentTerm = args.Term
        rn.state = Follower
        rn.votedFor = -1
    }

    // Rule 2: only vote if we haven't voted or already voted for this candidate
    if rn.votedFor == -1 || rn.votedFor == args.CandidateID {
        // Rule 3: candidate's log must be at least as up-to-date
        lastIdx := len(rn.log) - 1
        lastTerm := 0
        if lastIdx >= 0 {
            lastTerm = rn.log[lastIdx].Term
        }
        if args.LastLogTerm > lastTerm ||
            (args.LastLogTerm == lastTerm && args.LastLogIndex >= lastIdx) {
            reply.VoteGranted = true
            rn.votedFor = args.CandidateID
        }
    }
}

Log Replication

Once elected, the leader accepts client requests, appends them to its log, and replicates them to followers via AppendEntries RPCs. An entry is committed once a majority of nodes have it. Committed entries are applied to the state machine.

ASCII
  Log Replication Flow:

  Client --> Leader: "SET x=5"
      |
      v
  Leader appends to local log: [term=3, SET x=5]
      |
      +-------+-------+
      |       |       |
      v       v       v
  Follower1  Follower2  Follower3    (AppendEntries RPC)
      |       |       |
      v       v       v
  ACK        ACK      ACK
      |       |       |
      +-------+-------+
      |
      v
  Leader: 3/4 nodes have entry (majority!) --> COMMIT
  Leader applies to state machine and responds to client
  Next heartbeat tells followers to commit too

Paxos -- Simplified

Paxos, by Leslie Lamport, is the original consensus algorithm (1989). It is notoriously hard to understand. The basic idea has three roles: Proposers, Acceptors, and Learners.

Paxos in Two Phases

Phase 1 (Prepare): Proposer picks a proposal number N. Sends Prepare(N) to a majority of acceptors. Acceptors promise not to accept proposals with number < N. If they already accepted a value, they return it.

Phase 2 (Accept): If proposer gets promises from majority, it sends Accept(N, value) where value is either the highest-numbered previously accepted value, or the proposer's own value. Acceptors accept if they haven't promised a higher number.

Result: Once a majority of acceptors accept a value, consensus is reached. Learners are notified.

Raft vs Paxos

Use Raft. It was designed to be understandable and produces equivalent results. Paxos is a family of protocols (Basic, Multi, Fast, Cheap, etc.) and implementing it correctly is extremely difficult. Every production Paxos implementation deviates from the paper significantly. Raft gives you a clear spec to implement against.

5. Replication

Replication keeps copies of data on multiple nodes for fault tolerance and read throughput. The fundamental challenge: keeping replicas consistent when writes happen.

Single-Leader Replication

One node is the leader (master). All writes go to the leader, which streams changes to followers (slaves/replicas). Reads can come from any replica.

ASCII
  Writes ----> [Leader] ----> replication log ----+----+----+
                  |                                |    |    |
                  v                                v    v    v
              [Follower 1]                    [F2] [F3] [F4]
                  ^
                  |
  Reads <---------+ (reads from any replica)

  + Simple, no write conflicts
  - Single point of failure for writes
  - Replication lag = stale reads from followers

Multi-Leader Replication

Multiple nodes accept writes. Used in multi-datacenter setups. Each datacenter has a local leader. The big problem: write conflicts when two leaders accept conflicting writes.

Leaderless Replication

Any replica can accept writes. Used by Dynamo, Cassandra, Riak. Clients write to multiple replicas and read from multiple replicas. Uses quorum reads/writes to ensure consistency.

Conflict Resolution

Node.js
// Last-Writer-Wins (LWW) -- simple but loses data
function resolveConflictLWW(versions) {
  return versions.reduce((latest, v) =>
    v.timestamp > latest.timestamp ? v : latest
  );
}

// Merge function -- application-specific, preserves data
function resolveShoppingCart(v1, v2) {
  // Union of items from both carts
  const merged = new Map();
  for (const item of [...v1.items, ...v2.items]) {
    const existing = merged.get(item.id);
    if (!existing || item.quantity > existing.quantity) {
      merged.set(item.id, item);
    }
  }
  return { items: [...merged.values()] };
}

CRDTs -- Conflict-Free Replicated Data Types

Data structures that can be replicated across nodes and updated independently without coordination. They mathematically guarantee convergence. Two types: state-based (CvRDT) and operation-based (CmRDT).

Go
// G-Counter: a grow-only counter CRDT
// Each node increments its own slot. Total = sum of all slots.
type GCounter struct {
    counts map[string]uint64 // nodeID -> count
    nodeID string
}

func NewGCounter(nodeID string) *GCounter {
    return &GCounter{
        counts: map[string]uint64{nodeID: 0},
        nodeID: nodeID,
    }
}

func (gc *GCounter) Increment() {
    gc.counts[gc.nodeID]++
}

func (gc *GCounter) Value() uint64 {
    var total uint64
    for _, v := range gc.counts {
        total += v
    }
    return total
}

// Merge takes the max of each node's counter
func (gc *GCounter) Merge(other *GCounter) {
    for k, v := range other.counts {
        if v > gc.counts[k] {
            gc.counts[k] = v
        }
    }
}

// PN-Counter: supports both increment and decrement
// Uses two G-Counters: one for increments, one for decrements
type PNCounter struct {
    pos *GCounter
    neg *GCounter
}

func (pn *PNCounter) Increment() { pn.pos.Increment() }
func (pn *PNCounter) Decrement() { pn.neg.Increment() }
func (pn *PNCounter) Value() int64  {
    return int64(pn.pos.Value()) - int64(pn.neg.Value())
}
CRDTs in Production

Redis has built-in CRDT support in Redis Enterprise. Riak uses CRDTs for maps, sets, counters, and flags. Automerge and Yjs are CRDT libraries for collaborative editing (Google Docs-style). CRDTs trade off space (metadata grows) for coordination-freedom.

6. Partitioning & Sharding

When data is too large for one node, you split it across multiple nodes. Each piece is a partition (or shard). The key question: which data goes to which node?

Range Partitioning

Assign contiguous ranges of keys to each partition. Good for range queries but risks hot spots (e.g., all recent timestamps hit the same partition).

Hash Partitioning

Hash the key and assign based on hash value. Distributes data evenly but range queries require scatter-gather.

Go
import (
    "crypto/sha256"
    "encoding/binary"
    "fmt"
)

func hashPartition(key string, numPartitions int) int {
    h := sha256.Sum256([]byte(key))
    num := binary.BigEndian.Uint64(h[:8])
    return int(num % uint64(numPartitions))
}

func main() {
    keys := []string{"user:1001", "user:1002", "user:1003", "order:5001"}
    for _, k := range keys {
        fmt.Printf("Key %s -> Partition %d\n", k, hashPartition(k, 4))
    }
}

Consistent Hashing

The problem with simple hash partitioning: when you add or remove a node, almost ALL keys get reassigned. Consistent hashing fixes this -- only K/N keys move on average (K = total keys, N = number of nodes).

ASCII
  Consistent Hashing Ring:

                    0 / 2^32
                      |
              Node C   |   Node A
                \      |      /
                 \     |     /
                  *----+----*
                 /           \
                /             \
         ------               ------
        |      |             |      |
        |      |             |      |
         ------               ------
                \             /
                 \           /
                  *---------*
                 /     |     \
                /      |      \
              Node B   |   Node D
                      |
                    2^31

  Each key is hashed onto the ring.
  Walk clockwise to find the first node -- that's the owner.
  Virtual nodes: each physical node gets multiple positions on the ring
  for better balance.
Go
import (
    "crypto/sha256"
    "encoding/binary"
    "fmt"
    "sort"
)

type ConsistentHash struct {
    ring       []uint64
    nodeMap    map[uint64]string
    vnodes     int // virtual nodes per physical node
}

func NewConsistentHash(vnodes int) *ConsistentHash {
    return &ConsistentHash{
        nodeMap: make(map[uint64]string),
        vnodes:  vnodes,
    }
}

func (ch *ConsistentHash) hashKey(key string) uint64 {
    h := sha256.Sum256([]byte(key))
    return binary.BigEndian.Uint64(h[:8])
}

func (ch *ConsistentHash) AddNode(node string) {
    for i := 0; i < ch.vnodes; i++ {
        vkey := fmt.Sprintf("%s-vnode-%d", node, i)
        hash := ch.hashKey(vkey)
        ch.ring = append(ch.ring, hash)
        ch.nodeMap[hash] = node
    }
    sort.Slice(ch.ring, func(i, j int) bool {
        return ch.ring[i] < ch.ring[j]
    })
}

func (ch *ConsistentHash) GetNode(key string) string {
    hash := ch.hashKey(key)
    idx := sort.Search(len(ch.ring), func(i int) bool {
        return ch.ring[i] >= hash
    })
    if idx == len(ch.ring) {
        idx = 0 // wrap around the ring
    }
    return ch.nodeMap[ch.ring[idx]]
}
Rebalancing is Expensive

Even with consistent hashing, moving data between nodes takes time and bandwidth. Plan for it: use partition counts larger than your node count so you can move whole partitions. Kafka uses this approach -- partitions are the unit of rebalancing, not individual keys.

7. Distributed Transactions

How do you ensure atomicity when a transaction spans multiple services or partitions? Single-node ACID does not help you here.

Two-Phase Commit (2PC)

ASCII
  Phase 1: PREPARE
  Coordinator ----Prepare----> Participant A  --> "Yes, I can commit"
              ----Prepare----> Participant B  --> "Yes, I can commit"
              ----Prepare----> Participant C  --> "Yes, I can commit"

  Phase 2: COMMIT (if all said yes)
  Coordinator ----Commit-----> Participant A  --> ACK
              ----Commit-----> Participant B  --> ACK
              ----Commit-----> Participant C  --> ACK

  If ANY participant says "No" in Phase 1:
  Coordinator ----Abort------> All Participants

  Problem: If coordinator crashes after Phase 1, participants are STUCK
  waiting (blocking protocol). This is why 2PC is rarely used across services.

Sagas -- Compensating Transactions

Instead of a distributed transaction, break it into a sequence of local transactions. Each step has a compensating action that undoes it if a later step fails.

Node.js
// Saga: Order Processing
// Step 1: Reserve inventory
// Step 2: Charge payment
// Step 3: Ship order
// If step 3 fails, compensate: refund payment, release inventory

class SagaOrchestrator {
  constructor() {
    this.steps = [];
    this.completedSteps = [];
  }

  addStep(name, execute, compensate) {
    this.steps.push({ name, execute, compensate });
  }

  async run(context) {
    for (const step of this.steps) {
      try {
        console.log(`Executing: ${step.name}`);
        const result = await step.execute(context);
        context[step.name] = result;
        this.completedSteps.push(step);
      } catch (err) {
        console.log(`Failed: ${step.name} -- ${err.message}`);
        await this.compensate(context);
        throw err;
      }
    }
    return context;
  }

  async compensate(context) {
    // Compensate in reverse order
    for (const step of this.completedSteps.reverse()) {
      try {
        console.log(`Compensating: ${step.name}`);
        await step.compensate(context);
      } catch (err) {
        console.error(`Compensation failed for ${step.name}: ${err.message}`);
        // Log for manual intervention -- compensations MUST eventually succeed
      }
    }
  }
}

// Usage
const saga = new SagaOrchestrator();

saga.addStep(
  'reserveInventory',
  async (ctx) => { /* call inventory service */ return { reservationId: 'abc' }; },
  async (ctx) => { /* release reservation ctx.reserveInventory.reservationId */ }
);

saga.addStep(
  'chargePayment',
  async (ctx) => { /* call payment service */ return { chargeId: 'ch_123' }; },
  async (ctx) => { /* refund ctx.chargePayment.chargeId */ }
);

saga.addStep(
  'shipOrder',
  async (ctx) => { /* call shipping service */ return { trackingId: 'trk_789' }; },
  async (ctx) => { /* cancel shipment */ }
);

Transactional Outbox Pattern

When you need to update a database AND publish an event atomically. Write both to the same database in one transaction. A separate process reads the outbox table and publishes events.

Go
// Transactional Outbox: write order + event in one DB transaction
func CreateOrderWithOutbox(db *sql.DB, order Order) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. Insert the order
    _, err = tx.Exec(
        "INSERT INTO orders (id, user_id, total) VALUES ($1, $2, $3)",
        order.ID, order.UserID, order.Total,
    )
    if err != nil {
        return err
    }

    // 2. Insert event into outbox table (same transaction!)
    eventPayload, _ := json.Marshal(order)
    _, err = tx.Exec(
        "INSERT INTO outbox (id, event_type, payload, created_at) VALUES ($1, $2, $3, NOW())",
        uuid.New(), "order.created", eventPayload,
    )
    if err != nil {
        return err
    }

    return tx.Commit()
}

// Separate poller reads outbox and publishes to message broker
func OutboxPoller(db *sql.DB, publisher MessagePublisher) {
    for {
        rows, _ := db.Query(
            "SELECT id, event_type, payload FROM outbox WHERE published = false ORDER BY created_at LIMIT 100",
        )
        for rows.Next() {
            var id, eventType, payload string
            rows.Scan(&id, &eventType, &payload)
            publisher.Publish(eventType, payload)
            db.Exec("UPDATE outbox SET published = true WHERE id = $1", id)
        }
        time.Sleep(500 * time.Millisecond)
    }
}
CDC Instead of Polling

Instead of polling the outbox table, use Change Data Capture (CDC) with tools like Debezium. It reads the database's write-ahead log (WAL) and streams changes to Kafka. No polling delay, no extra load on the database. This is the production-grade approach.

8. Gossip Protocols

How do nodes in a large cluster discover each other, detect failures, and propagate state? Gossip protocols spread information like a rumor -- each node periodically tells a random peer what it knows.

ASCII
  Gossip Round 1:        Round 2:           Round 3:
  Node A knows X         A tells B           B tells D
                         A tells C           C tells E
  [A*] [B] [C]          [A*] [B*] [C*]      [A*] [B*] [C*]
  [D]  [E] [F]          [D]  [E]  [F]       [D*] [E*] [F]

  Information spreads in O(log N) rounds to all N nodes.
  Even if some messages are lost, redundancy ensures delivery.

Failure Detection with Gossip

Nodes periodically ping random peers. If a node does not respond, don't immediately declare it dead. Use a suspicion mechanism: mark it as suspected, gossip the suspicion, and only declare dead after a timeout.

Go
type NodeStatus int

const (
    Alive     NodeStatus = iota
    Suspected
    Dead
)

type Member struct {
    ID        string
    Addr      string
    Status    NodeStatus
    Heartbeat uint64
    Timestamp time.Time
}

type GossipNode struct {
    mu        sync.RWMutex
    self      Member
    members   map[string]*Member
    suspectTTL time.Duration
}

func (g *GossipNode) GossipRound() {
    g.mu.RLock()
    peers := make([]string, 0, len(g.members))
    for id := range g.members {
        if id != g.self.ID {
            peers = append(peers, id)
        }
    }
    g.mu.RUnlock()

    if len(peers) == 0 {
        return
    }

    // Pick a random peer (fanout = 1 for simplicity)
    target := peers[rand.Intn(len(peers))]

    // Send our membership list, receive theirs, merge
    g.sendGossip(target)
}

func (g *GossipNode) MergeMemberList(incoming []Member) {
    g.mu.Lock()
    defer g.mu.Unlock()

    for _, m := range incoming {
        existing, ok := g.members[m.ID]
        if !ok {
            // New node discovered
            g.members[m.ID] = &m
            continue
        }
        // Higher heartbeat = more recent info
        if m.Heartbeat > existing.Heartbeat {
            existing.Heartbeat = m.Heartbeat
            existing.Status = m.Status
            existing.Timestamp = time.Now()
        }
    }
}

func (g *GossipNode) DetectFailures() {
    g.mu.Lock()
    defer g.mu.Unlock()

    for _, m := range g.members {
        if m.ID == g.self.ID {
            continue
        }
        age := time.Since(m.Timestamp)
        if m.Status == Alive && age > g.suspectTTL {
            m.Status = Suspected
        } else if m.Status == Suspected && age > g.suspectTTL*3 {
            m.Status = Dead
        }
    }
}
Gossip in Practice

Cassandra uses gossip for cluster membership and failure detection.

HashiCorp Serf/Consul uses the SWIM protocol (gossip-based failure detection).

Redis Cluster uses gossip for node discovery and slot assignment propagation.

9. Load Balancing

Distribute incoming traffic across multiple backend servers to maximize throughput, minimize latency, and avoid overloading any single server.

Algorithms

ASCII
  Algorithm Comparison:

  Round Robin        Least Connections    Weighted Round Robin
  A -> B -> C ->     A(2) B(5) C(1)       A(w=3) B(w=1) C(w=2)
  A -> B -> C ->     next -> C (least)     A A A B C C A A A ...
  Simple, fair if    Good for varying       Good when servers
  requests are       request durations      have different capacity
  similar cost

L4 vs L7 Load Balancing

Layer 4 vs Layer 7
L4 (Transport): Operates on TCP/UDP. Sees IP + port. Very fast (kernel-level). Cannot inspect HTTP headers, cookies, or URLs. Example: AWS NLB, HAProxy (TCP mode), IPVS.

L7 (Application): Operates on HTTP/gRPC. Can route based on URL path, headers, cookies. Can do SSL termination. Slower than L4 but much more flexible. Example: Nginx, HAProxy (HTTP mode), AWS ALB, Envoy.
Node.js
// Simple round-robin load balancer in Node.js
const http = require('http');
const httpProxy = require('http-proxy');

const backends = [
  { host: '127.0.0.1', port: 3001 },
  { host: '127.0.0.1', port: 3002 },
  { host: '127.0.0.1', port: 3003 },
];

let current = 0;
const proxy = httpProxy.createProxyServer({});

const server = http.createServer((req, res) => {
  const target = backends[current % backends.length];
  current++;

  proxy.web(req, res, {
    target: `http://${target.host}:${target.port}`
  });
});

proxy.on('error', (err, req, res) => {
  res.writeHead(502);
  res.end('Bad Gateway');
});

server.listen(8080, () => {
  console.log('Load balancer on :8080');
});
Go
// Least-connections load balancer
type Backend struct {
    URL         *url.URL
    Alive       bool
    Connections int64
    mu          sync.RWMutex
}

type LeastConnLB struct {
    backends []*Backend
    mu       sync.RWMutex
}

func (lb *LeastConnLB) NextBackend() *Backend {
    lb.mu.RLock()
    defer lb.mu.RUnlock()

    var best *Backend
    for _, b := range lb.backends {
        if !b.Alive {
            continue
        }
        if best == nil || atomic.LoadInt64(&b.Connections) < atomic.LoadInt64(&best.Connections) {
            best = b
        }
    }
    if best != nil {
        atomic.AddInt64(&best.Connections, 1)
    }
    return best
}

10. Distributed Caching

Caching reduces load on databases and speeds up responses. In a distributed system, caching introduces its own set of consistency challenges.

Cache-Aside (Lazy Loading)

The application checks the cache first. On miss, it loads from the database, stores in cache, and returns. On write, it updates the database and invalidates the cache.

Node.js
const Redis = require('ioredis');
const redis = new Redis();

async function getUserCacheAside(userId) {
  const cacheKey = `user:${userId}`;

  // 1. Check cache
  const cached = await redis.get(cacheKey);
  if (cached) {
    return JSON.parse(cached); // Cache hit
  }

  // 2. Cache miss -- load from DB
  const user = await db.query('SELECT * FROM users WHERE id = $1', [userId]);

  // 3. Store in cache with TTL
  await redis.setex(cacheKey, 3600, JSON.stringify(user));

  return user;
}

async function updateUser(userId, data) {
  // 1. Update database FIRST
  await db.query('UPDATE users SET name=$1 WHERE id=$2', [data.name, userId]);

  // 2. Invalidate cache (don't update -- avoids race conditions)
  await redis.del(`user:${userId}`);
}

Write-Through Cache

Every write goes to both cache and database synchronously. Guarantees cache is always up-to-date but adds latency to writes.

Cache Stampede (Thundering Herd)

When a popular cache key expires, hundreds of concurrent requests all miss the cache and hit the database simultaneously. Solutions:

Go
import (
    "context"
    "sync"
    "time"
)

// singleflight: only one goroutine fetches, others wait for the result
type SingleFlight struct {
    mu    sync.Mutex
    calls map[string]*call
}

type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}

func (sf *SingleFlight) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    sf.mu.Lock()
    if sf.calls == nil {
        sf.calls = make(map[string]*call)
    }

    if c, ok := sf.calls[key]; ok {
        sf.mu.Unlock()
        c.wg.Wait() // Wait for the in-flight request
        return c.val, c.err
    }

    c := &call{}
    c.wg.Add(1)
    sf.calls[key] = c
    sf.mu.Unlock()

    // Only ONE goroutine executes this
    c.val, c.err = fn()
    c.wg.Done()

    sf.mu.Lock()
    delete(sf.calls, key)
    sf.mu.Unlock()

    return c.val, c.err
}

// Usage: prevents cache stampede
// var sf SingleFlight
// val, err := sf.Do("user:123", func() (interface{}, error) {
//     return db.GetUser(123)  // only called once even with 1000 concurrent requests
// })
Cache Invalidation

"There are only two hard things in Computer Science: cache invalidation and naming things." Delete on write is the safest strategy. Never update the cache on write -- if two writes happen concurrently, the cache can end up with a stale value permanently. Use TTLs as a safety net. Consider event-driven invalidation with CDC for strong consistency.

11. Rate Limiting

Protect your services from being overwhelmed. Rate limiting controls how many requests a client can make in a given time window.

Token Bucket Algorithm

A bucket holds tokens. Each request consumes one token. Tokens are added at a fixed rate. If the bucket is empty, the request is rejected. Allows bursts up to the bucket capacity.

Go
type TokenBucket struct {
    mu         sync.Mutex
    tokens     float64
    maxTokens  float64
    refillRate float64 // tokens per second
    lastRefill time.Time
}

func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
    return &TokenBucket{
        tokens:     maxTokens,
        maxTokens:  maxTokens,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    // Refill tokens based on elapsed time
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    tb.tokens += elapsed * tb.refillRate
    if tb.tokens > tb.maxTokens {
        tb.tokens = tb.maxTokens
    }
    tb.lastRefill = now

    // Try to consume a token
    if tb.tokens >= 1.0 {
        tb.tokens -= 1.0
        return true
    }
    return false
}

Sliding Window with Redis

For distributed rate limiting across multiple servers, use Redis sorted sets. Each request adds a timestamp, and you count entries in the window.

Node.js
const Redis = require('ioredis');
const redis = new Redis();

async function slidingWindowRateLimit(userId, maxRequests, windowSec) {
  const key = `ratelimit:${userId}`;
  const now = Date.now();
  const windowStart = now - windowSec * 1000;

  // Use a Redis transaction (pipeline)
  const pipe = redis.pipeline();

  // Remove entries outside the window
  pipe.zremrangebyscore(key, 0, windowStart);

  // Count entries in the window
  pipe.zcard(key);

  // Add current request
  pipe.zadd(key, now, `${now}-${Math.random()}`);

  // Set TTL so the key auto-expires
  pipe.expire(key, windowSec);

  const results = await pipe.exec();
  const count = results[1][1]; // result of zcard

  if (count >= maxRequests) {
    return { allowed: false, remaining: 0 };
  }

  return { allowed: true, remaining: maxRequests - count - 1 };
}

// Usage: 100 requests per 60 seconds per user
// const result = await slidingWindowRateLimit('user:42', 100, 60);
Rate Limiting in Practice

Use token bucket for API rate limiting (allows bursts). Use sliding window for strict per-second limits. For distributed rate limiting, Redis is the standard choice. Consider using a Lua script in Redis to make the check-and-increment atomic. Libraries: Go has golang.org/x/time/rate, Node.js has rate-limiter-flexible.

12. Idempotency

An operation is idempotent if performing it multiple times has the same effect as performing it once. In distributed systems, retries are inevitable -- your APIs MUST be idempotent.

Why Idempotency Matters
Client sends request → Server processes it → Response is lost
Client retries → Server processes it AGAIN → Double charge!

With idempotency key: Server checks "I already processed this" → Returns cached response
Go
// Idempotency middleware using Redis
func IdempotencyMiddleware(redisClient *redis.Client) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            idempotencyKey := r.Header.Get("Idempotency-Key")
            if idempotencyKey == "" {
                next.ServeHTTP(w, r)
                return
            }

            cacheKey := "idempotency:" + idempotencyKey

            // Check if we already processed this request
            cached, err := redisClient.Get(r.Context(), cacheKey).Result()
            if err == nil {
                // Already processed -- return cached response
                w.Header().Set("Content-Type", "application/json")
                w.Write([]byte(cached))
                return
            }

            // Try to acquire a lock (prevent concurrent processing of same key)
            lockKey := cacheKey + ":lock"
            ok, _ := redisClient.SetNX(r.Context(), lockKey, "1", 30*time.Second).Result()
            if !ok {
                http.Error(w, "Request already in progress", 409)
                return
            }
            defer redisClient.Del(r.Context(), lockKey)

            // Capture the response
            rec := &responseRecorder{ResponseWriter: w, body: &bytes.Buffer{}}
            next.ServeHTTP(rec, r)

            // Cache the response for 24 hours
            redisClient.Set(r.Context(), cacheKey, rec.body.String(), 24*time.Hour)
        })
    }
}
Node.js
// Idempotent payment processing
async function processPayment(idempotencyKey, amount, userId) {
  // 1. Check if this payment was already processed
  const existing = await db.query(
    'SELECT result FROM idempotency_keys WHERE key = $1',
    [idempotencyKey]
  );

  if (existing.rows.length > 0) {
    return JSON.parse(existing.rows[0].result); // Return cached result
  }

  // 2. Process the payment
  const charge = await stripe.charges.create({
    amount,
    currency: 'usd',
    customer: userId,
    idempotency_key: idempotencyKey, // Stripe supports this natively!
  });

  // 3. Store result with the idempotency key
  await db.query(
    'INSERT INTO idempotency_keys (key, result, created_at) VALUES ($1, $2, NOW())',
    [idempotencyKey, JSON.stringify(charge)]
  );

  return charge;
}
Exactly-Once is a Lie (Sort Of)

True exactly-once delivery is impossible in a distributed system (proven by the Two Generals Problem). What we actually achieve is effectively-once: at-least-once delivery + idempotent processing = appears exactly-once. Kafka achieves "exactly-once semantics" through idempotent producers + transactional consumers -- it is still at-least-once under the hood.

13. Observability

You cannot fix what you cannot see. In a distributed system, a single user request may touch 10+ services. You need distributed tracing, metrics, and structured logging to understand what happened.

Distributed Tracing with OpenTelemetry

ASCII
  A single user request as a trace:

  Trace ID: abc-123
  |
  +-- Span: API Gateway (12ms)
  |   |
  |   +-- Span: Auth Service (3ms)
  |   |
  |   +-- Span: Order Service (45ms)
  |       |
  |       +-- Span: DB Query (8ms)
  |       |
  |       +-- Span: Payment Service (30ms)
  |           |
  |           +-- Span: Stripe API call (25ms)
  |
  Total: 60ms

  Each span has: trace_id, span_id, parent_span_id, start_time,
  duration, service_name, attributes, status
Go
import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("order-service")

func CreateOrder(ctx context.Context, req OrderRequest) (*Order, error) {
    ctx, span := tracer.Start(ctx, "CreateOrder",
        trace.WithAttributes(
            attribute.String("user.id", req.UserID),
            attribute.Float64("order.total", req.Total),
        ),
    )
    defer span.End()

    // This span is a child of the parent span from the API gateway
    // The context carries the trace ID and parent span ID

    // Call payment service -- the context propagates the trace
    charge, err := paymentClient.Charge(ctx, req.Total)
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
        return nil, err
    }

    span.AddEvent("payment_processed",
        trace.WithAttributes(attribute.String("charge.id", charge.ID)),
    )

    return &Order{ID: generateID(), ChargeID: charge.ID}, nil
}

Circuit Breakers

When a downstream service is failing, stop sending it traffic. Let it recover. The circuit breaker pattern has three states: Closed (normal), Open (failing, reject requests), Half-Open (test if service recovered).

ASCII
                    success
            +---------------------+
            v                     |
         +--------+         +----------+
         | CLOSED | -----> | HALF-OPEN |
         +--------+ timer  +----------+
            |  failures          |
            |  exceed            | failure
            |  threshold         |
            v                    v
         +------+          +------+
         | OPEN | <--------| OPEN |
         +------+          +------+
         (reject all      (back to open)
          requests)
Node.js
class CircuitBreaker {
  constructor(options = {}) {
    this.failureThreshold = options.failureThreshold || 5;
    this.resetTimeout = options.resetTimeout || 30000; // 30s
    this.state = 'CLOSED';
    this.failures = 0;
    this.lastFailure = null;
  }

  async execute(fn) {
    if (this.state === 'OPEN') {
      if (Date.now() - this.lastFailure > this.resetTimeout) {
        this.state = 'HALF_OPEN';
      } else {
        throw new Error('Circuit breaker is OPEN');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (err) {
      this.onFailure();
      throw err;
    }
  }

  onSuccess() {
    this.failures = 0;
    this.state = 'CLOSED';
  }

  onFailure() {
    this.failures++;
    this.lastFailure = Date.now();
    if (this.failures >= this.failureThreshold) {
      this.state = 'OPEN';
    }
  }
}

// Usage
const breaker = new CircuitBreaker({ failureThreshold: 3, resetTimeout: 10000 });

async function callPaymentService(data) {
  return breaker.execute(async () => {
    const res = await fetch('https://payment.internal/charge', {
      method: 'POST',
      body: JSON.stringify(data),
    });
    if (!res.ok) throw new Error(`Payment failed: ${res.status}`);
    return res.json();
  });
}
The Three Pillars of Observability

Traces: Follow a request through services (Jaeger, Zipkin, Datadog APM). Metrics: Aggregated numbers over time -- request rate, error rate, latency percentiles (Prometheus + Grafana). Logs: Structured event records with correlation IDs (ELK stack, Loki). OpenTelemetry unifies all three with a single SDK.

14. Practice & Resources

Distributed systems are best learned by reading papers, implementing protocols, and building things that break.

MIT 6.824: Distributed Systems

The Gold Standard Course

MIT 6.824 (now 6.5840) is the best distributed systems course in existence. The labs have you implement:

Lab 1: MapReduce -- distributed batch processing framework

Lab 2: Raft -- full consensus algorithm with leader election, log replication, persistence

Lab 3: Fault-tolerant key/value service -- built on top of your Raft implementation

Lab 4: Sharded key/value service -- with dynamic shard migration

All labs are in Go. The course is freely available at pdos.csail.mit.edu/6.824. Do the labs. They will take weeks and teach you more than any book.

Designing Data-Intensive Applications (DDIA)

The Bible

Martin Kleppmann's DDIA is the single best book on distributed systems for practitioners. It covers replication, partitioning, transactions, consistency, batch/stream processing, and more. Read it cover to cover. Then read it again. Every senior engineer should have read this book.

Papers to Read

Essential Reading List

Foundations:

- Lamport, "Time, Clocks, and the Ordering of Events" (1978) -- the paper that started it all

- Fischer, Lynch, Paterson, "Impossibility of Distributed Consensus" (FLP, 1985) -- proves async consensus is impossible with one faulty process

- Brewer, "CAP Twelve Years Later" (2012) -- Brewer's own clarification of CAP

Consensus:

- Ongaro & Ousterhout, "In Search of an Understandable Consensus Algorithm" (Raft, 2014)

- Lamport, "Paxos Made Simple" (2001) -- Lamport's attempt to simplify his own algorithm

Industry Systems:

- DeCandia et al., "Dynamo: Amazon's Highly Available Key-value Store" (2007) -- eventual consistency, vector clocks, consistent hashing

- Corbett et al., "Spanner: Google's Globally-Distributed Database" (2012) -- TrueTime, external consistency

- Kreps, "The Log: What every software engineer should know" (2013) -- the foundation of Kafka's design

- Shapiro et al., "Conflict-free Replicated Data Types" (2011) -- CRDTs formalized

Build These Projects

Hands-On Practice

1. Distributed KV Store: Build a key-value store with Raft consensus. Start with MIT 6.824 labs.

2. URL Shortener at Scale: Hash partitioning, read replicas, caching layer, rate limiting.

3. Chat System: WebSockets across multiple servers. Use Redis Pub/Sub for cross-server message delivery. Implement causal ordering.

4. Distributed Task Queue: Like a mini Celery/Sidekiq. At-least-once delivery, idempotent workers, dead letter queue.

5. Fly.io Gossip Glomers: A set of distributed systems challenges at fly.io/dist-sys. Implement broadcast, unique ID generation, counters, and Kafka-style logs. Uses Maelstrom for testing.

The Hard Truth

Distributed systems are genuinely difficult. Bugs are non-deterministic, latent, and often only appear under load or during failures. You cannot unit test your way to correctness -- you need property-based testing (Jepsen), chaos engineering (Chaos Monkey), and formal verification (TLA+). Start simple. Add distribution only when you must. A single PostgreSQL instance handles more load than most startups will ever need.