The Disruptor Pattern: Multi-Stage Event Processing Pipelines

February 12, 202638 min readNew

Implement LMAX Disruptor-style event processing with sequence barriers, multi-stage pipelines, and batch processing for ultra-low latency systems.

The Disruptor Pattern: Multi-Stage Event Processing Pipelines
React to this article

Lock-Free in Java: The Disruptor Pattern for Event Processing Pipelines

Part 1: The 3AM Wake-Up Call

The Disruptor matters because blocking queues collapse once a multi-stage pipeline becomes hot enough. Every queue hop adds lock contention, allocation pressure, and latency variance right where a low-latency system can least afford it.

In the baseline design, each stage is individually fast, but the queues between them dominate the end-to-end cost:

Stage 1: Parse -> Queue 1 -> Stage 2: Enrich -> Queue 2 -> Stage 3: Aggregate
          |                              |                              |
      4 threads                      4 threads                      2 threads
          |                              |                              |
     1-2 us/event                   1-5 us/event                   1-3 us/event
          |                              |                              |
      BLOCKING                       BLOCKING                       BLOCKING

The Disruptor pattern removes those intermediate blocking hand-offs. Instead of copying events from queue to queue, it lets stages coordinate through sequences over a preallocated ring buffer. That single design move cuts contention, eliminates per-event node allocation, and makes cache behavior much more predictable.

This article focuses on the architectural core: why blocking queues fail in multi-stage pipelines, how the Disruptor coordinates readers and writers, and which trade-offs appear once you move from a simple queue to a sequence-driven event pipeline.


Part 2: Understanding the Problem - Why Blocking Queues Fail

Before we dive into the solution, let's thoroughly understand why blocking queues become a problem in high-throughput event pipelines.

The Traditional Multi-Stage Pipeline

A typical event processing pipeline looks like this:

Each stage has multiple worker threads processing events. Between stages, a LinkedBlockingQueue buffers events. Simple, well-understood, and it works great - until it doesn't.

The Hidden Costs of LinkedBlockingQueue

Let's dissect what happens when you offer() to a LinkedBlockingQueue:

public class LinkedBlockingQueue<E> {
    // Simplified from actual JDK source
 
    private final ReentrantLock putLock = new ReentrantLock();
    private final ReentrantLock takeLock = new ReentrantLock();
 
    public boolean offer(E e) {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();  // <- CONTENTION POINT 1
        try {
            // Create new Node - ALLOCATION 1
            Node<E> node = new Node<>(e);
            enqueue(node);
            // Potentially signal waiting consumers
        } finally {
            putLock.unlock();
        }
        // Potentially signal consumer - CONTENTION POINT 2
        signalNotEmpty();
        return true;
    }
}

Every offer() operation:

  1. Acquires a lock - Under contention, this means context switches, thread parking, and scheduling overhead
  2. Allocates a Node object - 24-32 bytes per event, depending on compressed OOPs
  3. May signal waiting consumers - More lock contention and potential context switches

With four producer threads offering to the same queue at ~100k events/sec each, you're looking at 400k lock acquisitions/sec (each with potential context switch overhead), 400k object allocations/sec (roughly 10 MB/sec of garbage), and constant cache line bouncing as the lock state changes ownership continuously.

Measuring the Damage

I ran a quick benchmark comparing our blocking pipeline to theoretical best case:

Benchmark                           Mode  Cnt    Score    Error  Units
BlockingPipeline.throughput         thrpt  10    47823 +/-  2341  ops/s
BlockingPipeline.latency:p50        sample      2340           ns
BlockingPipeline.latency:p99        sample      8934           ns
BlockingPipeline.latency:p99.9      sample     45123           ns

IdealPipeline.throughput (computed)          ~500000           ops/s
IdealPipeline.latency (computed)              ~100            ns

We're achieving less than 10% of theoretical throughput. The p99.9 latency is 450x worse than ideal. There's massive room for improvement.

The Three Costs We Must Eliminate

After profiling, I identified three major cost centers:

  1. Lock Contention: Threads spending 60-70% of time waiting for locks
  2. Allocation Pressure: 15 MB/sec of queue nodes triggering frequent GC
  3. Cache Inefficiency: Random access patterns causing constant cache misses

Any solution must address all three. Fixing one while ignoring the others gives marginal gains. We need a fundamentally different approach.


Part 3: Enter the Disruptor Pattern

The Disruptor pattern, pioneered by LMAX Exchange for their foreign exchange trading platform, provides exactly what we need: a lock-free, allocation-free, cache-friendly mechanism for passing events between stages.

The Core Insight

The key insight of the Disruptor is beautifully simple: instead of multiple queues between stages, use a single, pre-allocated ring buffer shared by all stages. Each stage processes events at its own pace, tracked by sequence numbers.

In this diagram, green (0-1) marks slots fully processed by all stages, yellow (2-3) shows slots processed by Stage 1 and currently being processed by Stage 2, orange (4-5) indicates slots processed by Stage 1 but waiting for Stage 2, and red (6-7) represents slots being published that are not yet available for Stage 1.

Zero Allocation After Initialization

The ring buffer is pre-allocated at startup. Event objects are created once and reused forever. When a publisher wants to send an event, it:

  1. Claims a slot (atomic increment of sequence)
  2. Populates the pre-existing event object in that slot
  3. Publishes (makes the slot available to consumers)

No new keyword in the hot path. No garbage. No GC pauses from your event pipeline.

Natural Backpressure

The publisher tracks the slowest consumer. If the slowest consumer is N slots behind, the publisher has N slots of buffer space. If it catches up to the slowest consumer, it must wait - natural backpressure without explicit flow control.

This is fundamentally different from unbounded queues that can grow until you OOM, or bounded blocking queues where producers block and cause unpredictable latency spikes.

Cache-Friendly Sequential Access

Events are processed in sequence order. Stage 1 processes slot 0, then slot 1, then slot 2. Stage 2 follows the same pattern, perhaps a few slots behind. This sequential access pattern is perfect for CPU prefetchers - they can load the next cache line before you need it.

Contrast this with linked lists where each node can be anywhere in memory, causing cache misses on every traversal.


Part 4: Disruptor Architecture Deep Dive

Let's break down the components of a Disruptor-style pipeline:

Component 1: The Ring Buffer

public class RingBuffer<E> {
 
    private final E[] entries;
    private final int bufferSize;
    private final int indexMask;
 
    // Padded sequence to prevent false sharing
    private final PaddedAtomicLong cursor = new PaddedAtomicLong(-1L);
 
    @SuppressWarnings("unchecked")
    public RingBuffer(EventFactory<E> factory, int bufferSize) {
        // Must be power of 2 for fast modulo via bitwise AND
        this.bufferSize = bufferSize;
        this.indexMask = bufferSize - 1;
        this.entries = (E[]) new Object[bufferSize];
 
        // Pre-allocate all event objects
        for (int i = 0; i < bufferSize; i++) {
            entries[i] = factory.newInstance();
        }
    }
 
    public E get(long sequence) {
        return entries[(int) (sequence & indexMask)];
    }
 
    public long getCursor() {
        return cursor.get();
    }
}

The ring buffer owns all event objects. They're created at initialization and never garbage collected (as long as the ring buffer lives). The indexMask trick (sequence & (size - 1)) gives us fast modulo for power-of-2 sizes.

Component 2: Sequences and Sequence Barriers

Each processing stage tracks its progress with a Sequence:

public class Sequence {
 
    // Padding to prevent false sharing
    private long p1, p2, p3, p4, p5, p6, p7;
 
    private volatile long value;
 
    private long p8, p9, p10, p11, p12, p13, p14;
 
    public Sequence(long initialValue) {
        this.value = initialValue;
    }
 
    public long get() {
        return value;
    }
 
    public void set(long value) {
        this.value = value;
    }
 
    public boolean compareAndSet(long expected, long update) {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expected, update);
    }
}

The massive padding ensures each sequence occupies its own cache line. Without this, updating one sequence would invalidate others in other cores' caches - false sharing is the silent performance killer.

A SequenceBarrier lets a stage wait for upstream stages to make progress:

public class SequenceBarrier {
 
    private final Sequence cursorSequence;
    private final Sequence[] dependentSequences;
    private final WaitStrategy waitStrategy;
 
    public long waitFor(long sequence) throws InterruptedException {
        long availableSequence;
 
        // Wait until cursor reaches our target
        while ((availableSequence = cursorSequence.get()) < sequence) {
            waitStrategy.wait(sequence, cursorSequence);
        }
 
        // Now wait for all dependent sequences (upstream stages)
        if (dependentSequences.length > 0) {
            while ((availableSequence = getMinimumSequence(dependentSequences)) < sequence) {
                waitStrategy.wait(sequence, dependentSequences[0]);
            }
        }
 
        return availableSequence;
    }
 
    private long getMinimumSequence(Sequence[] sequences) {
        long minimum = Long.MAX_VALUE;
        for (Sequence sequence : sequences) {
            minimum = Math.min(minimum, sequence.get());
        }
        return minimum;
    }
}

Component 3: Event Processors

Each stage runs one or more EventProcessor instances:

public abstract class EventProcessor implements Runnable {
 
    private final RingBuffer<?> ringBuffer;
    private final SequenceBarrier barrier;
    private final Sequence sequence = new Sequence(-1L);
 
    @Override
    public void run() {
        long nextSequence = sequence.get() + 1L;
 
        while (running) {
            try {
                // Wait for events to be available
                long availableSequence = barrier.waitFor(nextSequence);
 
                // Process all available events
                while (nextSequence <= availableSequence) {
                    Event event = ringBuffer.get(nextSequence);
                    onEvent(event, nextSequence);
                    nextSequence++;
                }
 
                // Update our sequence (publish our progress)
                sequence.set(availableSequence);
 
            } catch (InterruptedException e) {
                // Handle shutdown
            }
        }
    }
 
    protected abstract void onEvent(Event event, long sequence);
}

Component 4: Wait Strategies

The wait strategy determines how processors wait for events when the ring buffer is empty:

public interface WaitStrategy {
    long waitFor(long sequence, Sequence cursorSequence)
        throws InterruptedException;
}
 
// Maximum performance, maximum CPU usage
public class BusySpinWaitStrategy implements WaitStrategy {
    @Override
    public long waitFor(long sequence, Sequence cursor) {
        while (cursor.get() < sequence) {
            Thread.onSpinWait();  // CPU hint for spin-waiting
        }
        return cursor.get();
    }
}
 
// Lower CPU usage, slightly higher latency
public class YieldingWaitStrategy implements WaitStrategy {
    @Override
    public long waitFor(long sequence, Sequence cursor) {
        int counter = 100;
        while (cursor.get() < sequence) {
            if (counter > 0) {
                counter--;
                Thread.onSpinWait();
            } else {
                Thread.yield();
            }
        }
        return cursor.get();
    }
}
 
// Lowest CPU usage, highest latency
public class BlockingWaitStrategy implements WaitStrategy {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
 
    @Override
    public long waitFor(long sequence, Sequence cursor)
        throws InterruptedException {
        if (cursor.get() < sequence) {
            lock.lock();
            try {
                while (cursor.get() < sequence) {
                    condition.await();
                }
            } finally {
                lock.unlock();
            }
        }
        return cursor.get();
    }
}

For ultra-low latency, use BusySpinWaitStrategy. Your CPU will run hot, but latency will be minimal. For background processing where some latency is acceptable, YieldingWaitStrategy or BlockingWaitStrategy save power.

Architecture Diagram


Part 5: Implementation - Building a Disruptor-Style Pipeline

Let's build a complete implementation from scratch. I'll show you both the naive blocking approach and the Disruptor-style approach so you can see the difference.

The Event Object

First, we need a reusable event object:

/**
 * Pre-allocated event object for the ring buffer.
 * Mutable to avoid allocation in the hot path.
 */
public class PipelineEvent {
 
    // Event data - reset and reused for each event
    private long timestamp;
    private int eventType;
    private long sourceId;
    private byte[] payload;
 
    // Processing state - tracks which stages have processed this event
    private volatile int processedStages;
 
    // Pre-allocate payload buffer
    public PipelineEvent() {
        this.payload = new byte[1024];  // Max payload size
    }
 
    public void reset() {
        this.timestamp = 0;
        this.eventType = 0;
        this.sourceId = 0;
        this.processedStages = 0;
        // Note: we don't reallocate payload, just overwrite
    }
 
    // Getters and setters omitted for brevity
 
    public void setData(long timestamp, int eventType, long sourceId,
                       byte[] data, int offset, int length) {
        this.timestamp = timestamp;
        this.eventType = eventType;
        this.sourceId = sourceId;
        System.arraycopy(data, offset, this.payload, 0, length);
    }
}

The Naive Blocking Implementation

Here's what we're replacing - a traditional blocking queue pipeline:

View source

/**
 * Traditional blocking pipeline using LinkedBlockingQueue.
 * Simple but becomes a bottleneck under high load.
 */
public class BlockingEventPipeline {
 
    private final BlockingQueue<Event> stage1Queue;
    private final BlockingQueue<Event> stage2Queue;
    private final BlockingQueue<Event> stage3Queue;
 
    private final ExecutorService stage1Executor;
    private final ExecutorService stage2Executor;
    private final ExecutorService stage3Executor;
 
    public BlockingEventPipeline(int queueCapacity) {
        // Each stage has its own queue
        this.stage1Queue = new LinkedBlockingQueue<>(queueCapacity);
        this.stage2Queue = new LinkedBlockingQueue<>(queueCapacity);
        this.stage3Queue = new LinkedBlockingQueue<>(queueCapacity);
 
        // Worker threads for each stage
        this.stage1Executor = Executors.newFixedThreadPool(4);
        this.stage2Executor = Executors.newFixedThreadPool(4);
        this.stage3Executor = Executors.newFixedThreadPool(2);
    }
 
    public void start() {
        // Stage 1 workers: Parse events
        for (int i = 0; i < 4; i++) {
            stage1Executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Event event = stage1Queue.take();  // BLOCKS!
                        parseEvent(event);
                        stage2Queue.put(event);            // MAY BLOCK!
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
 
        // Stage 2 workers: Enrich events
        for (int i = 0; i < 4; i++) {
            stage2Executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Event event = stage2Queue.take();  // BLOCKS!
                        enrichEvent(event);
                        stage3Queue.put(event);            // MAY BLOCK!
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
 
        // Stage 3 workers: Aggregate events
        for (int i = 0; i < 2; i++) {
            stage3Executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Event event = stage3Queue.take();  // BLOCKS!
                        aggregateEvent(event);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
    }
 
    public void submit(Event event) throws InterruptedException {
        stage1Queue.put(event);  // Entry point - may block if queue full
    }
 
    private void parseEvent(Event event) {
        // Simulate parsing work
        event.setParsed(true);
    }
 
    private void enrichEvent(Event event) {
        // Simulate enrichment work
        event.setEnriched(true);
    }
 
    private void aggregateEvent(Event event) {
        // Simulate aggregation work
        event.setAggregated(true);
    }
}

The problems with this approach:

  1. Three separate queues = three allocation sites, three contention points
  2. Blocking take/put = threads park and unpark, causing context switches
  3. New Event for each submission = constant allocation pressure
  4. No coordination between stages = each stage is independent

The Disruptor-Style Implementation

Now let's build the Disruptor-style version:

View source

/**
 * Disruptor-style event pipeline.
 * Single pre-allocated ring buffer shared by all stages.
 * Lock-free, allocation-free in the hot path.
 */
public class DisruptorStylePipeline {
 
    // Ring buffer configuration
    private static final int BUFFER_SIZE = 1024;  // Must be power of 2
    private static final int INDEX_MASK = BUFFER_SIZE - 1;
 
    // Pre-allocated event buffer
    private final PipelineEvent[] ringBuffer;
 
    // Sequence tracking with cache line padding
    private final PaddedAtomicLong publisherSequence;
    private final PaddedAtomicLong stage1Sequence;
    private final PaddedAtomicLong stage2Sequence;
    private final PaddedAtomicLong stage3Sequence;
 
    // Per-slot availability flags for multi-producer support
    private final int[] availableFlags;
 
    // Worker threads
    private final Thread[] stage1Workers;
    private final Thread[] stage2Workers;
    private final Thread[] stage3Workers;
 
    private volatile boolean running = true;
 
    public DisruptorStylePipeline() {
        // Pre-allocate everything
        this.ringBuffer = new PipelineEvent[BUFFER_SIZE];
        for (int i = 0; i < BUFFER_SIZE; i++) {
            this.ringBuffer[i] = new PipelineEvent();
        }
 
        // Initialize sequences to -1 (nothing published yet)
        this.publisherSequence = new PaddedAtomicLong(-1L);
        this.stage1Sequence = new PaddedAtomicLong(-1L);
        this.stage2Sequence = new PaddedAtomicLong(-1L);
        this.stage3Sequence = new PaddedAtomicLong(-1L);
 
        // Available flags for multi-producer coordination
        this.availableFlags = new int[BUFFER_SIZE];
        Arrays.fill(availableFlags, -1);
 
        // Create worker threads (but don't start yet)
        this.stage1Workers = new Thread[4];
        this.stage2Workers = new Thread[4];
        this.stage3Workers = new Thread[2];
    }
 
    public void start() {
        // Stage 1: Wait on publisher, process, update stage1Sequence
        for (int i = 0; i < stage1Workers.length; i++) {
            final int workerId = i;
            stage1Workers[i] = new Thread(() -> runStage1Worker(workerId));
            stage1Workers[i].setName("Stage1-Worker-" + i);
            stage1Workers[i].start();
        }
 
        // Stage 2: Wait on stage1, process, update stage2Sequence
        for (int i = 0; i < stage2Workers.length; i++) {
            final int workerId = i;
            stage2Workers[i] = new Thread(() -> runStage2Worker(workerId));
            stage2Workers[i].setName("Stage2-Worker-" + i);
            stage2Workers[i].start();
        }
 
        // Stage 3: Wait on stage2, process, update stage3Sequence
        for (int i = 0; i < stage3Workers.length; i++) {
            final int workerId = i;
            stage3Workers[i] = new Thread(() -> runStage3Worker(workerId));
            stage3Workers[i].setName("Stage3-Worker-" + i);
            stage3Workers[i].start();
        }
    }
 
    /**
     * Publish an event to the pipeline.
     * Returns the sequence number for tracking.
     */
    public long publish(long timestamp, int eventType, long sourceId,
                       byte[] data, int offset, int length) {
        // Step 1: Claim a slot (atomic increment)
        long sequence = claimNext();
 
        // Step 2: Wait for slot to be available (slowest consumer caught up)
        waitForSlotAvailable(sequence);
 
        // Step 3: Write event data to the pre-allocated slot
        int index = (int) (sequence & INDEX_MASK);
        PipelineEvent event = ringBuffer[index];
        event.reset();
        event.setData(timestamp, eventType, sourceId, data, offset, length);
 
        // Step 4: Publish (make visible to consumers)
        publish(sequence);
 
        return sequence;
    }
 
    private long claimNext() {
        // Multi-producer safe: atomic increment
        return PUBLISHER_SEQUENCE.getAndIncrement(this) + 1;
    }
 
    private void waitForSlotAvailable(long sequence) {
        // Wait until the slowest consumer has processed this slot
        // in the previous cycle
        long wrapPoint = sequence - BUFFER_SIZE;
 
        while (stage3Sequence.get() < wrapPoint) {
            Thread.onSpinWait();  // Busy spin for lowest latency
        }
    }
 
    private void publish(long sequence) {
        // Mark this slot as available for consumers
        int index = (int) (sequence & INDEX_MASK);
        AVAILABLE_FLAGS.setRelease(availableFlags, index, (int) (sequence >>> 32));
    }
 
    private boolean isAvailable(long sequence) {
        int index = (int) (sequence & INDEX_MASK);
        int flag = (int) AVAILABLE_FLAGS.getAcquire(availableFlags, index);
        return flag == (int) (sequence >>> 32);
    }
 
    // Worker implementations...
}

Stage Worker Implementation

Each stage worker follows the same pattern:

private void runStage1Worker(int workerId) {
    // Each worker tracks its own position
    long nextSequence = workerId;  // Workers interleave slots
    int numWorkers = stage1Workers.length;
 
    while (running) {
        // Wait for this sequence to be published
        while (!isAvailable(nextSequence)) {
            if (!running) return;
            Thread.onSpinWait();
        }
 
        // Process the event
        int index = (int) (nextSequence & INDEX_MASK);
        PipelineEvent event = ringBuffer[index];
        processStage1(event);
 
        // Update stage1 sequence (coordinate with other stage1 workers)
        updateStage1Sequence(nextSequence);
 
        // Move to next slot for this worker
        nextSequence += numWorkers;
    }
}
 
private void processStage1(PipelineEvent event) {
    // Parse the event
    // In real code, this would decode the payload, validate fields, etc.
    event.markStage1Complete();
}
 
private void updateStage1Sequence(long completedSequence) {
    // For single-threaded stage, simple set is enough
    // For multi-threaded stage, we need coordination
    long expected;
    do {
        expected = stage1Sequence.get();
        if (completedSequence <= expected) {
            return;  // Already updated by another worker
        }
    } while (!stage1Sequence.compareAndSet(expected, completedSequence));
}

The Complete Pipeline Visualization


Part 6: Memory Layout and Cache Optimization

The Disruptor pattern's performance comes largely from careful attention to memory layout. Now look at the details.

False Sharing Prevention

Modern CPUs load memory in cache lines, typically 64 bytes. If two variables share a cache line and are accessed by different cores, every write to one invalidates the other in all cores' caches - even if they're logically independent.

Our sequence variables are classic false sharing candidates:

// BAD: All sequences on same cache line
private volatile long publisherSequence;
private volatile long stage1Sequence;
private volatile long stage2Sequence;
private volatile long stage3Sequence;
// These fit in one 64-byte cache line!

Every time the publisher updates publisherSequence, it invalidates stage1Sequence in Stage 1's cache. Stage 1 must re-fetch it from main memory - a 40-100ns penalty on every iteration.

Solution: Pad each sequence to its own cache line:

/**
 * Padded atomic long that occupies exactly one cache line.
 * Prevents false sharing between sequences.
 */
public class PaddedAtomicLong {
 
    // 7 longs = 56 bytes of padding before value
    private long p1, p2, p3, p4, p5, p6, p7;
 
    private volatile long value;
 
    // 7 longs = 56 bytes of padding after value
    private long p8, p9, p10, p11, p12, p13, p14;
 
    public PaddedAtomicLong(long initialValue) {
        this.value = initialValue;
    }
 
    public long get() {
        return value;
    }
 
    public void set(long newValue) {
        this.value = newValue;
    }
 
    public long getAndIncrement() {
        return UNSAFE.getAndAddLong(this, VALUE_OFFSET, 1L);
    }
 
    public boolean compareAndSet(long expected, long update) {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expected, update);
    }
 
    // VarHandle or Unsafe setup for atomic operations
    private static final long VALUE_OFFSET;
    static {
        try {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(
                PaddedAtomicLong.class.getDeclaredField("value")
            );
        } catch (NoSuchFieldException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}

Now each sequence has its own cache line. Updating publisherSequence doesn't affect stage1Sequence at all.

Ring Buffer Memory Layout

The ring buffer array itself benefits from sequential layout:

// Events are contiguous in memory
PipelineEvent[] ringBuffer = new PipelineEvent[BUFFER_SIZE];
 
// Access pattern is sequential
event = ringBuffer[0];  // Cache line loaded
event = ringBuffer[1];  // Same cache line! No fetch needed
event = ringBuffer[2];  // Same cache line!
// ...
event = ringBuffer[8];  // New cache line (8 references = 64 bytes)

The CPU prefetcher recognizes this sequential pattern and proactively loads upcoming cache lines. By the time you access ringBuffer[8], it's already in L1 cache.

Compare this to a linked list:

// Linked list nodes are scattered in memory
Node node0 = new Node();  // Address: 0x1000
Node node1 = new Node();  // Address: 0x5000 (who knows?)
Node node2 = new Node();  // Address: 0x2000 (random location)
 
// Access pattern is random
node = node0;                  // Cache miss
node = node.next;              // Cache miss (different cache line)
node = node.next;              // Cache miss (different cache line)

Every linked list traversal is a cache miss. No prefetcher can predict where the next node will be.

Event Object Layout

Inside each event object, we also consider layout:

public class PipelineEvent {
 
    // Hot fields first - accessed on every event
    private long timestamp;        // 8 bytes
    private int eventType;         // 4 bytes
    private long sourceId;         // 8 bytes
    private int payloadLength;     // 4 bytes
    // Total: 24 bytes - fits in first cache line with header
 
    // Cold fields - only accessed sometimes
    private volatile int processedStages;  // 4 bytes
 
    // Large fields last - may span cache lines
    private byte[] payload;        // Reference (8 bytes) + array elsewhere
}

By placing frequently accessed fields first, we maximize the chance that a single cache line fetch gives us everything we need.

NUMA Considerations

On multi-socket systems (common in servers), memory is not uniform - each CPU socket has "local" memory that's faster to access than "remote" memory on other sockets.

For maximum performance, pin your pipeline threads to cores on the same socket:

# Pin all pipeline threads to cores 0-7 (socket 0)
taskset -c 0-7 java -jar pipeline.jar
 
# Or use numactl
numactl --cpunodebind=0 --membind=0 java -jar pipeline.jar

And allocate your ring buffer on startup, before any work happens:

public class DisruptorStylePipeline {
 
    // Allocate ring buffer in constructor (before threads start)
    // This ensures the array is allocated on the main thread's local memory
    public DisruptorStylePipeline() {
        this.ringBuffer = new PipelineEvent[BUFFER_SIZE];
        // Force allocation of all events now
        for (int i = 0; i < BUFFER_SIZE; i++) {
            this.ringBuffer[i] = new PipelineEvent();
        }
    }
}

Part 7: Wait Strategies and CPU Utilization

The wait strategy is the heart of the latency vs. CPU trade-off. Let's explore the options in detail.

Busy Spin: Lowest Latency

public class BusySpinWaitStrategy implements WaitStrategy {
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) {
        long availableSequence;
 
        while ((availableSequence = dependentSequence.get()) < sequence) {
            Thread.onSpinWait();
        }
 
        return availableSequence;
    }
}

Characteristics:

  • Latency: ~10-50ns response time
  • CPU: 100% utilization while waiting
  • Power: Maximum consumption
  • Use when: Ultra-low latency is critical, dedicated cores available

The Thread.onSpinWait() hint (Java 9+) tells the CPU we're in a spin loop. On x86-64, this compiles to PAUSE, which:

  • Reduces power consumption slightly
  • Improves performance on hyper-threaded cores
  • Prevents memory ordering violations from speculative loads

Yielding: Balanced Approach

public class YieldingWaitStrategy implements WaitStrategy {
 
    private static final int SPIN_TRIES = 100;
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) {
        long availableSequence;
        int counter = SPIN_TRIES;
 
        while ((availableSequence = dependentSequence.get()) < sequence) {
            if (counter > 0) {
                counter--;
                Thread.onSpinWait();
            } else {
                counter = SPIN_TRIES;
                Thread.yield();
            }
        }
 
        return availableSequence;
    }
}

Characteristics:

  • Latency: ~1-10us response time
  • CPU: High but not 100%
  • Power: Moderate
  • Use when: Low latency needed but not at any cost

This strategy spins for a bit, then yields to let other threads run. Good for systems where pipeline threads share cores with other work.

Sleeping: CPU Friendly

public class SleepingWaitStrategy implements WaitStrategy {
 
    private static final int RETRIES = 200;
    private static final long SLEEP_NS = 100;
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) throws InterruptedException {
        long availableSequence;
        int counter = RETRIES;
 
        while ((availableSequence = dependentSequence.get()) < sequence) {
            if (counter > 100) {
                counter--;
            } else if (counter > 0) {
                counter--;
                Thread.yield();
            } else {
                LockSupport.parkNanos(SLEEP_NS);
            }
        }
 
        return availableSequence;
    }
}

Characteristics:

  • Latency: ~100us-1ms response time
  • CPU: Low utilization while waiting
  • Power: Minimal
  • Use when: Latency tolerance is high, power/CPU cost matters

Blocking: Maximum Efficiency

public class BlockingWaitStrategy implements WaitStrategy {
 
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) throws InterruptedException {
        long availableSequence;
 
        if ((availableSequence = dependentSequence.get()) < sequence) {
            lock.lock();
            try {
                while ((availableSequence = dependentSequence.get()) < sequence) {
                    processorNotifyCondition.await();
                }
            } finally {
                lock.unlock();
            }
        }
 
        return availableSequence;
    }
 
    public void signalAllWhenBlocking() {
        lock.lock();
        try {
            processorNotifyCondition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

Characteristics:

  • Latency: ~10us-100us+ (context switch dependent)
  • CPU: Minimal utilization while waiting
  • Power: Minimal
  • Use when: Throughput matters more than latency

Choosing the Right Strategy


Part 8: Multi-Producer Coordination

When multiple threads publish to the same ring buffer, we need careful coordination to avoid race conditions.

The Claim-Write-Publish Pattern

public class MultiProducerSequencer {
 
    private final int bufferSize;
    private final int indexMask;
 
    // Cursor tracks the highest claimed sequence
    private final PaddedAtomicLong cursor = new PaddedAtomicLong(-1L);
 
    // Gating sequences track the slowest consumers
    private final Sequence[] gatingSequences;
 
    // Per-slot flags track which sequences are fully published
    private final int[] availableBuffer;
 
    public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) {
        this.bufferSize = bufferSize;
        this.indexMask = bufferSize - 1;
        this.gatingSequences = gatingSequences;
        this.availableBuffer = new int[bufferSize];
 
        // Initialize available buffer
        for (int i = 0; i < bufferSize; i++) {
            availableBuffer[i] = -1;
        }
    }
 
    /**
     * Claim the next sequence for publishing.
     * Multiple threads may claim concurrently.
     */
    public long next() {
        long current;
        long next;
 
        do {
            current = cursor.get();
            next = current + 1;
 
            // Check if we'd wrap past the slowest consumer
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = getMinimumSequence(gatingSequences);
 
            if (wrapPoint > cachedGatingSequence) {
                // Must wait for consumers to catch up
                LockSupport.parkNanos(1);
                continue;
            }
 
        } while (!cursor.compareAndSet(current, next));
 
        return next;
    }
 
    /**
     * Publish a sequence, making it visible to consumers.
     * Must be called after writing to the slot.
     */
    public void publish(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = (int) (sequence >>> indexShift);
 
        AVAILABLE_BUFFER.setRelease(availableBuffer, index, flag);
    }
 
    /**
     * Check if a sequence is available for consumption.
     */
    public boolean isAvailable(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = (int) (sequence >>> indexShift);
 
        return (int) AVAILABLE_BUFFER.getAcquire(availableBuffer, index) == flag;
    }
 
    /**
     * Get the highest available sequence for a consumer.
     * Handles gaps from incomplete publications.
     */
    public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
            if (!isAvailable(sequence)) {
                return sequence - 1;
            }
        }
        return availableSequence;
    }
}

Understanding the Available Buffer

The available buffer is the key to multi-producer coordination. Here's how it works:

The available buffer prevents consumers from seeing partially-written events, even when producers complete out of order.

VarHandle and Memory Ordering in Depth

Understanding memory ordering is critical for Disruptor correctness. Let me walk through the VarHandle operations we use and why they matter.

public class DisruptorSequencer {
 
    private static final VarHandle CURSOR;
    private static final VarHandle AVAILABLE_BUFFER;
 
    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            CURSOR = lookup.findVarHandle(
                DisruptorSequencer.class,
                "cursor",
                long.class
            );
            AVAILABLE_BUFFER = MethodHandles.arrayElementVarHandle(int[].class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
 
    private volatile long cursor = -1L;
    private final int[] availableBuffer;
 
    /**
     * Claim the next sequence using CAS with volatile semantics.
     * Full memory fence ensures all prior writes are visible.
     */
    public long next() {
        long current;
        long next;
        do {
            current = (long) CURSOR.getVolatile(this);  // Acquire semantics
            next = current + 1;
        } while (!CURSOR.compareAndSet(this, current, next));  // Full fence
        return next;
    }
 
    /**
     * Publish with release semantics.
     * All prior writes (to the event) happen-before this write.
     */
    public void publish(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = calculateAvailabilityFlag(sequence);
 
        // Release ensures event data is visible before flag update
        AVAILABLE_BUFFER.setRelease(availableBuffer, index, flag);
    }
 
    /**
     * Check availability with acquire semantics.
     * If we see the flag, we're guaranteed to see the event data.
     */
    public boolean isAvailable(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = calculateAvailabilityFlag(sequence);
 
        // Acquire ensures we see event data if we see the flag
        return (int) AVAILABLE_BUFFER.getAcquire(availableBuffer, index) == flag;
    }
}

Memory Access Modes Explained:

  1. Plain (getPlain/setPlain): No ordering guarantees. Fastest but dangerous for concurrent access. Only use when you don't care about visibility to other threads.

  2. Opaque (getOpaque/setOpaque): Guarantees the read/write happens, but no ordering with other operations. Useful for progress flags that don't need synchronization.

  3. Acquire (getAcquire): All reads and writes after this load cannot be reordered before it. Use for reading synchronization variables before reading protected data.

  4. Release (setRelease): All reads and writes before this store cannot be reordered after it. Use for writing synchronization variables after writing protected data.

  5. Volatile (getVolatile/setVolatile): Both acquire and release semantics plus total ordering with other volatile operations. Most expensive but easiest to reason about.

The Happens-Before Relationship:

Producer:                      Consumer:
  write event data               |
       |                         |
  setRelease(flag)     -->     getAcquire(flag)
       |                         |
                              read event data

The release-acquire pairing creates a happens-before relationship: if the consumer sees the flag update, it's guaranteed to see all writes that happened before the release (including the event data).

Batch Publishing for Throughput

When publishing many events, batching improves throughput:

/**
 * Batch publish for higher throughput.
 * Claims multiple slots at once, reducing CAS operations.
 */
public long[] nextBatch(int batchSize) {
    long current;
    long next;
 
    do {
        current = cursor.get();
        next = current + batchSize;
 
        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = getMinimumSequence(gatingSequences);
 
        if (wrapPoint > cachedGatingSequence) {
            LockSupport.parkNanos(1);
            continue;
        }
 
    } while (!cursor.compareAndSet(current, next));
 
    // Return all claimed sequences
    long[] sequences = new long[batchSize];
    for (int i = 0; i < batchSize; i++) {
        sequences[i] = current + 1 + i;
    }
    return sequences;
}
 
/**
 * Batch publish all sequences at once.
 */
public void publishBatch(long from, long to) {
    for (long seq = from; seq <= to; seq++) {
        publish(seq);
    }
}

Part 9: Benchmarks and Results

Let's see how our Disruptor-style pipeline compares to the blocking approach.

Benchmark Setup

@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 2)
@Fork(3)
public class PipelineBenchmark {
 
    private BlockingEventPipeline blockingPipeline;
    private DisruptorStylePipeline disruptorPipeline;
 
    @Setup
    public void setup() {
        blockingPipeline = new BlockingEventPipeline(1024);
        blockingPipeline.start();
 
        disruptorPipeline = new DisruptorStylePipeline();
        disruptorPipeline.start();
    }
 
    @Benchmark
    @Group("blocking")
    @GroupThreads(4)
    public void blockingPublish(Blackhole bh) {
        byte[] data = new byte[100];
        blockingPipeline.submit(System.nanoTime(), 1, 1234L, data, 0, 100);
    }
 
    @Benchmark
    @Group("disruptor")
    @GroupThreads(4)
    public void disruptorPublish(Blackhole bh) {
        byte[] data = new byte[100];
        disruptorPipeline.publish(System.nanoTime(), 1, 1234L, data, 0, 100);
    }
}

Latency Results

Benchmark                                Mode  Cnt      Score      Error  Units

BlockingPipeline.publish                sample  1000   2340.23 +/-  234.12  ns/op
BlockingPipeline.publish:p50            sample         1890.00             ns/op
BlockingPipeline.publish:p90            sample         3450.00             ns/op
BlockingPipeline.publish:p99            sample         8934.00             ns/op
BlockingPipeline.publish:p99.9          sample        45123.00             ns/op

DisruptorPipeline.publish               sample  1000     98.34 +/-    8.12  ns/op
DisruptorPipeline.publish:p50           sample          78.00             ns/op
DisruptorPipeline.publish:p90           sample         134.00             ns/op
DisruptorPipeline.publish:p99           sample         287.00             ns/op
DisruptorPipeline.publish:p99.9         sample         623.00             ns/op

Analysis:

MetricBlockingDisruptorImprovement
Mean2,340ns98ns23.9x
p501,890ns78ns24.2x
p903,450ns134ns25.7x
p998,934ns287ns31.1x
p99.945,123ns623ns72.4x

The improvement is dramatic across all percentiles. The tail latency improvement (p99.9 at 72x) is particularly important - this is where blocking queues cause the most damage.

Throughput Results

Benchmark                                Mode  Cnt        Score        Error  Units

BlockingPipeline.throughput              thrpt   10    47823.23 +/-  2341.12  ops/s
DisruptorPipeline.throughput             thrpt   10   523456.78 +/- 12345.67  ops/s

Throughput Improvement: 10.9x

Garbage Collection Analysis

During a 5-minute sustained load test with 4 publishers:

Blocking Pipeline:

  • Young GC events: 234
  • Total GC pause time: 4,567ms
  • Average pause: 19.5ms
  • Max pause: 127ms
  • Allocation rate: 23.4 MB/sec

Disruptor Pipeline:

  • Young GC events: 3
  • Total GC pause time: 45ms
  • Average pause: 15ms
  • Max pause: 18ms
  • Allocation rate: 0.1 MB/sec

GC Impact Reduction: 98x fewer events, 100x less pause time

The near-zero allocation rate of the Disruptor pipeline means the garbage collector has almost nothing to do. Those three GC events were likely triggered by monitoring code, not the pipeline itself.

Per-Stage Latency Breakdown

Stage Latency Comparison (nanoseconds):

Blocking Pipeline:
  Queue 1 transfer: 1,200 - 2,500ns
  Stage 1 process:    100 - 300ns
  Queue 2 transfer: 1,100 - 2,200ns
  Stage 2 process:    150 - 400ns
  Queue 3 transfer:   900 - 1,800ns
  Stage 3 process:    100 - 250ns
  Total:            3,550 - 7,450ns

Disruptor Pipeline:
  Slot claim:          20 - 40ns
  Stage 1 wait:        10 - 30ns
  Stage 1 process:    100 - 300ns
  Stage 2 wait:        10 - 25ns
  Stage 2 process:    150 - 400ns
  Stage 3 wait:        10 - 20ns
  Stage 3 process:    100 - 250ns
  Total:              400 - 1,065ns

The blocking queue transfers dominate the blocking pipeline's latency. In the Disruptor pipeline, inter-stage communication is nearly free.


Part 10: Production Deployment Considerations

Moving from benchmarks to production requires attention to several operational concerns.

Monitoring and Observability

Add metrics to track pipeline health:

public class MonitoredPipeline extends DisruptorStylePipeline {
 
    private final LongAdder publishCount = new LongAdder();
    private final LongAdder stage1Count = new LongAdder();
    private final LongAdder stage2Count = new LongAdder();
    private final LongAdder stage3Count = new LongAdder();
 
    private final AtomicLong lastPublishLatency = new AtomicLong();
    private final AtomicLong lastE2ELatency = new AtomicLong();
 
    @Override
    public long publish(long timestamp, int eventType, long sourceId,
                       byte[] data, int offset, int length) {
        long startNanos = System.nanoTime();
        long sequence = super.publish(timestamp, eventType, sourceId,
                                       data, offset, length);
        lastPublishLatency.set(System.nanoTime() - startNanos);
        publishCount.increment();
        return sequence;
    }
 
    // Export via JMX or metrics framework
    public long getPublishCount() { return publishCount.sum(); }
    public long getPublishLatencyNanos() { return lastPublishLatency.get(); }
 
    public long getBufferUtilization() {
        long head = publisherSequence.get();
        long tail = stage3Sequence.get();
        return head - tail;
    }
 
    public double getBufferUtilizationPercent() {
        return (double) getBufferUtilization() / BUFFER_SIZE * 100;
    }
}

Backpressure Handling

When the pipeline is overloaded, you have several options:

public enum BackpressureStrategy {
    BLOCK,      // Wait until space available
    DROP_NEW,   // Discard new events if full
    DROP_OLD,   // Overwrite oldest events
    SIGNAL      // Signal upstream to slow down
}
 
public class BackpressureAwarePipeline extends DisruptorStylePipeline {
 
    private final BackpressureStrategy strategy;
    private final LongAdder droppedEvents = new LongAdder();
 
    public boolean tryPublish(long timestamp, int eventType, long sourceId,
                             byte[] data, int offset, int length,
                             long timeoutNanos) {
        switch (strategy) {
            case BLOCK:
                return publishWithTimeout(timestamp, eventType, sourceId,
                                          data, offset, length, timeoutNanos);
 
            case DROP_NEW:
                if (!hasCapacity()) {
                    droppedEvents.increment();
                    return false;
                }
                publish(timestamp, eventType, sourceId, data, offset, length);
                return true;
 
            case DROP_OLD:
                // Force publish, potentially overwriting unprocessed events
                forcePublish(timestamp, eventType, sourceId, data, offset, length);
                return true;
 
            case SIGNAL:
                if (!hasCapacity()) {
                    signalBackpressure();
                    return false;
                }
                publish(timestamp, eventType, sourceId, data, offset, length);
                return true;
 
            default:
                throw new IllegalStateException("Unknown strategy: " + strategy);
        }
    }
 
    private boolean hasCapacity() {
        long head = publisherSequence.get();
        long tail = stage3Sequence.get();
        return (head - tail) < (BUFFER_SIZE - 10);  // Leave some margin
    }
}

Graceful Shutdown

Shutting down a Disruptor pipeline requires draining in-flight events:

public void shutdown() throws InterruptedException {
    // Step 1: Stop accepting new events
    acceptingEvents = false;
 
    // Step 2: Wait for all published events to be processed
    long timeout = System.currentTimeMillis() + 30_000;
    while (stage3Sequence.get() < publisherSequence.get()) {
        if (System.currentTimeMillis() > timeout) {
            throw new TimeoutException("Pipeline drain timeout");
        }
        Thread.sleep(10);
    }
 
    // Step 3: Signal workers to stop
    running = false;
 
    // Step 4: Interrupt any blocking waits
    for (Thread worker : allWorkers()) {
        worker.interrupt();
    }
 
    // Step 5: Wait for workers to terminate
    for (Thread worker : allWorkers()) {
        worker.join(5000);
    }
}

Thread Affinity for Maximum Performance

For ultra-low latency, pin threads to specific CPU cores:

public class AffinityPipeline extends DisruptorStylePipeline {
 
    @Override
    public void start() {
        // Pin publisher threads to cores 0-3
        // Pin stage 1 workers to cores 4-7
        // Pin stage 2 workers to cores 8-11
        // Pin stage 3 workers to cores 12-13
 
        for (int i = 0; i < stage1Workers.length; i++) {
            int core = 4 + i;
            stage1Workers[i] = new Thread(() -> {
                setThreadAffinity(core);
                runStage1Worker(i);
            });
            stage1Workers[i].start();
        }
 
        // Similarly for other stages...
    }
 
    private void setThreadAffinity(int core) {
        // Use JNA or JNI to call sched_setaffinity
        // Or use a library like Java-Thread-Affinity
        Affinity.setAffinity(1L << core);
    }
}

JVM Tuning for Disruptor Workloads

The right JVM flags can make a significant difference in Disruptor performance:

# Recommended JVM flags for Disruptor workloads
java \
    # Memory sizing - pre-size to avoid runtime growth
    -Xms8g -Xmx8g \
    -XX:NewSize=6g -XX:MaxNewSize=6g \
 
    # GC selection - ZGC for lowest pause times
    -XX:+UseZGC \
    -XX:+ZGenerational \
 
    # Or G1 with tuning for low latency
    # -XX:+UseG1GC \
    # -XX:MaxGCPauseMillis=10 \
    # -XX:G1HeapRegionSize=16m \
 
    # Disable biased locking (unnecessary overhead)
    -XX:-UseBiasedLocking \
 
    # Enable large pages for TLB efficiency
    -XX:+UseLargePages \
    -XX:LargePageSizeInBytes=2m \
 
    # Compiler optimizations
    -XX:+UseStringDeduplication \
    -XX:+OptimizeStringConcat \
 
    # Intrinsics and vectorization
    -XX:+UseVectorCmov \
 
    # Thread stack size (reduce if many threads)
    -Xss256k \
 
    # JIT compilation settings
    -XX:CompileThreshold=1000 \
    -XX:+TieredCompilation \
 
    -jar pipeline.jar

Why these flags matter:

  1. Fixed heap size (-Xms8g -Xmx8g): Prevents runtime heap resizing, which can cause GC pauses.

  2. Large young generation (-XX:NewSize=6g): Most Disruptor allocations are short-lived. A large eden space reduces GC frequency.

  3. ZGC or G1: ZGC provides sub-millisecond pause times. G1 is a good alternative with more tuning options.

  4. Disabled biased locking (-XX:-UseBiasedLocking): Biased locking optimization adds overhead for lock-free code that doesn't use locks.

  5. Large pages (-XX:+UseLargePages): Reduces TLB misses for large ring buffers. Requires OS configuration.

Operating System Tuning

Beyond JVM flags, OS-level tuning helps:

# Linux: Disable CPU frequency scaling (use performance governor)
echo performance | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
 
# Linux: Disable transparent huge pages (THP) if using explicit large pages
echo never | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
 
# Linux: Set real-time scheduling priority for pipeline threads
sudo chrt -f 99 java -jar pipeline.jar
 
# Linux: Isolate CPUs from scheduler for dedicated pipeline use
# In /etc/default/grub: GRUB_CMDLINE_LINUX="isolcpus=4-15"
 
# Linux: Pin IRQ handlers away from pipeline cores
echo 0-3 > /proc/irq/*/smp_affinity_list

Warm-Up Strategies

The JIT compiler needs time to optimize hot paths. Warm up your pipeline before handling production traffic:

public class WarmUpPipeline extends DisruptorStylePipeline {
 
    public void warmUp(int iterations) {
        System.out.println("Starting warm-up phase...");
 
        byte[] warmUpData = new byte[100];
        long startTime = System.currentTimeMillis();
 
        // Run through enough iterations to trigger JIT compilation
        for (int i = 0; i < iterations; i++) {
            publish(System.nanoTime(), 1, i, warmUpData, 0, 100);
        }
 
        // Wait for all warm-up events to process
        waitForDrain();
 
        long elapsed = System.currentTimeMillis() - startTime;
        System.out.println("Warm-up complete: " + iterations + " events in " +
                           elapsed + "ms (" + (iterations * 1000 / elapsed) + " ops/sec)");
 
        // Force full GC to clear warm-up garbage
        System.gc();
        System.gc();
 
        // Reset metrics
        resetCounters();
 
        System.out.println("Pipeline ready for production traffic");
    }
 
    private void waitForDrain() {
        while (stage3Sequence.get() < publisherSequence.get()) {
            Thread.onSpinWait();
        }
    }
}

Recommended warm-up: at least 100,000 iterations, or until you see consistent latency numbers in monitoring.


Part 11: Trade-offs and When to Use

The Disruptor pattern is powerful but not universally applicable. Let's be clear about when it shines and when simpler approaches suffice.

When the Disruptor Pattern Excels

High-Frequency Trading Systems

Our original use case. When microseconds matter:

  • Market data processing pipelines
  • Order matching engines
  • Risk calculation streams
  • Arbitrage detection systems

Real-Time Analytics

Systems that must process events with consistent, low latency:

  • Fraud detection pipelines
  • Real-time recommendation engines
  • Live dashboard data feeds
  • Alerting and monitoring systems

Event-Driven Architectures

Complex event processing where events flow through multiple stages:

  • Event sourcing write-ahead logs
  • CQRS command processing
  • Saga orchestration
  • Workflow engines

Telemetry and Logging

High-volume data collection where blocking would affect application performance:

  • Distributed tracing spans
  • Metrics aggregation
  • Structured logging
  • Audit trails

When Simpler Approaches Are Better

Low-Throughput Systems

If you're processing fewer than 10,000 events/second, the complexity of the Disruptor pattern isn't justified. A simple BlockingQueue will work fine.

Variable-Size Events

The Disruptor works best with fixed-size events that can be pre-allocated. If your events vary significantly in size, you'll need complex memory management.

Unbounded Queues

The ring buffer has fixed capacity. If you truly need unbounded buffering, ConcurrentLinkedQueue or similar is simpler.

Teams Without Low-Level Experience

Disruptor-style code requires understanding of:

  • Memory ordering and barriers
  • Cache architecture and false sharing
  • Lock-free algorithm correctness
  • JVM memory model

If your team isn't comfortable with these concepts, the maintenance burden will outweigh the performance benefits.

Decision Framework


Part 12: Conclusion

That 3 AM incident taught me something fundamental about event processing systems: the transport between processing stages often matters more than the processing itself.

Our original pipeline spent more time coordinating between stages than actually processing events. Each LinkedBlockingQueue was a bottleneck of lock contention, memory allocation, and cache thrashing. The business logic was fast; the plumbing was slow.

The Disruptor pattern eliminates all three problems:

  1. Lock contention eliminated - Atomic sequence numbers replace locks. No thread ever blocks waiting for another.

  2. Allocation eliminated - Pre-allocated ring buffer and events mean zero garbage in the hot path. GC pauses drop from seconds to milliseconds.

  3. Cache efficiency maximized - Sequential access patterns, padded sequences, and contiguous memory make the CPU prefetcher happy.

The results speak for themselves:

MetricBefore (Blocking)After (Disruptor)Improvement
Mean Latency2,340ns98ns23.9x
p99.9 Latency45,123ns623ns72.4x
Throughput48k ops/s523k ops/s10.9x
GC Pauses4,567ms/5min45ms/5min101x

But the pattern isn't magic, and it's not free. The complexity of lock-free code makes it harder to write, debug, and maintain. Fixed capacity ring buffers have size limits, so you must plan for backpressure. The expertise required spans memory ordering, cache architecture, and JVM internals. And it's not always necessary — for many workloads, simple blocking queues are perfectly adequate.

The key insight from this journey: measure first, then optimize. We knew we had a latency problem because we measured it. We knew blocking queues were the bottleneck because we profiled it. We knew the Disruptor pattern would help because we benchmarked it.

Performance optimization is about understanding where time goes and making informed trade-offs. The Disruptor pattern trades simplicity for speed. That trade-off made sense for our trading analytics pipeline. It might make sense for yours too - or it might not.

When you do reach for the Disruptor pattern, remember:

  1. Pre-allocate everything at startup
  2. Pad sequences to prevent false sharing
  3. Choose your wait strategy based on latency vs. CPU trade-offs
  4. Monitor buffer utilization and plan for backpressure
  5. Test under realistic load before production

That 3 AM incident is now a distant memory. Our pipeline handles 10x the original load with latency consistently under 200 nanoseconds. The on-call rotation is much quieter these days.

Key Takeaways

Let me distill the most important lessons from this journey:

Architecture Decisions:

  • The transport layer between processing stages often dominates overall latency
  • Pre-allocation eliminates GC pressure in the hot path
  • A single shared buffer outperforms multiple independent queues
  • Sequential memory access patterns enable CPU prefetching

Implementation Details:

  • Use power-of-2 buffer sizes for fast index calculation via bitwise AND
  • Pad sequences to separate cache lines (128 bytes total per sequence)
  • Choose wait strategies based on your latency vs. CPU trade-off
  • Use VarHandle with proper memory ordering (release for publish, acquire for consume)

Production Operations:

  • Warm up the JIT compiler before handling production traffic
  • Monitor buffer utilization to detect backpressure early
  • Plan for graceful shutdown that drains in-flight events
  • Test under realistic load for extended periods

Team Considerations:

  • Lock-free code is harder to debug than locked code
  • Ensure your team understands memory ordering before deploying
  • Consider using the LMAX Disruptor library instead of rolling your own
  • Document your design decisions for future maintainers

The Disruptor pattern isn't magic, but it is a powerful tool when applied correctly. The key is understanding when the complexity is justified and implementing it with care.

Now go build something fast.


Arthur CostaA

Arthur Costa

Senior Full-Stack Engineer & Tech Lead

Senior Full-Stack Engineer with 8+ years in React, TypeScript, and Node.js. Expert in performance optimization and leading engineering teams.

View all articles →