The Disruptor Pattern: Multi-Stage Event Processing Pipelines

February 12, 202651 min readNew

Implement LMAX Disruptor-style event processing with sequence barriers, multi-stage pipelines, and batch processing for ultra-low latency systems.

The Disruptor Pattern: Multi-Stage Event Processing Pipelines
React to this article

Lock-Free in Java: The Disruptor Pattern for Event Processing Pipelines

Part 1: The 3AM Wake-Up Call

3:18 AM. The alarm cuts through my sleep like a hot knife through butter. Another incident alert. I grab my phone, squinting at the bright screen: "CRITICAL: Event pipeline latency threshold exceeded - p99 > 5ms."

I pull myself out of bed and shuffle to my laptop. Coffee can wait; production can't.

Our real-time event processing system - the backbone of our financial analytics platform - is drowning under peak market load. I pull up the monitoring dashboard and watch the latency graphs paint a horror story. Event throughput has tanked from our usual 500k events/sec to a measly 10k events/sec. Worse, latency has spiked from sub-millisecond to an unacceptable 5ms.

Five milliseconds might not sound like much, but in financial services, it's an eternity. Trades get executed in microseconds. Arbitrage opportunities vanish in nanoseconds. When your analytics pipeline is 5ms behind, you're analyzing history, not helping traders.

I start digging through the flame graphs and thread dumps. The culprit reveals itself quickly: our three-stage event pipeline - parse, enrich, aggregate - is using LinkedBlockingQueue for inter-stage communication. Each queue is a bottleneck, each stage is contending for locks, and the garbage collector is working overtime to clean up the queue node allocations.

Here's what I'm looking at:

Stage 1: Parse -> Queue 1 -> Stage 2: Enrich -> Queue 2 -> Stage 3: Aggregate
          |                              |                              |
      4 threads                      4 threads                      2 threads
          |                              |                              |
     1-2 us/event                   1-5 us/event                   1-3 us/event
          |                              |                              |
      BLOCKING                       BLOCKING                       BLOCKING

The individual stages are fast - microseconds per event. But each blocking queue adds 1-5 microseconds of latency, and under contention, that explodes to tens of microseconds. Multiply by three stages, add some GC pauses from all those queue node allocations, and you've got 5ms of pain.

I've seen this pattern before. I've fixed it before. It's time to bring out the big guns: the Disruptor pattern.

By 6 AM, I have a proof-of-concept running. By 7 AM, it's in production. Latency drops from 5ms to 100 nanoseconds per stage. Throughput jumps from 10k to 500k events/sec. The garbage collector barely breaks a sweat.

This is the story of that transformation - the architecture, the implementation, and the lessons learned.


Part 2: Understanding the Problem - Why Blocking Queues Fail

Before we dive into the solution, let's thoroughly understand why blocking queues become a problem in high-throughput event pipelines.

The Traditional Multi-Stage Pipeline

A typical event processing pipeline looks like this:

Loading diagram...

Each stage has multiple worker threads processing events. Between stages, a LinkedBlockingQueue buffers events. Simple, well-understood, and it works great - until it doesn't.

The Hidden Costs of LinkedBlockingQueue

Let's dissect what happens when you offer() to a LinkedBlockingQueue:

public class LinkedBlockingQueue<E> {
    // Simplified from actual JDK source
 
    private final ReentrantLock putLock = new ReentrantLock();
    private final ReentrantLock takeLock = new ReentrantLock();
 
    public boolean offer(E e) {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();  // <- CONTENTION POINT 1
        try {
            // Create new Node - ALLOCATION 1
            Node<E> node = new Node<>(e);
            enqueue(node);
            // Potentially signal waiting consumers
        } finally {
            putLock.unlock();
        }
        // Potentially signal consumer - CONTENTION POINT 2
        signalNotEmpty();
        return true;
    }
}

Every offer() operation:

  1. Acquires a lock - Under contention, this means context switches, thread parking, and scheduling overhead
  2. Allocates a Node object - 24-32 bytes per event, depending on compressed OOPs
  3. May signal waiting consumers - More lock contention and potential context switches

With four producer threads offering to the same queue at ~100k events/sec each, you're looking at 400k lock acquisitions/sec (each with potential context switch overhead), 400k object allocations/sec (roughly 10 MB/sec of garbage), and constant cache line bouncing as the lock state changes ownership continuously.

Measuring the Damage

I ran a quick benchmark comparing our blocking pipeline to theoretical best case:

Benchmark                           Mode  Cnt    Score    Error  Units
BlockingPipeline.throughput         thrpt  10    47823 +/-  2341  ops/s
BlockingPipeline.latency:p50        sample      2340           ns
BlockingPipeline.latency:p99        sample      8934           ns
BlockingPipeline.latency:p99.9      sample     45123           ns

IdealPipeline.throughput (computed)          ~500000           ops/s
IdealPipeline.latency (computed)              ~100            ns

We're achieving less than 10% of theoretical throughput. The p99.9 latency is 450x worse than ideal. There's massive room for improvement.

The Three Costs We Must Eliminate

After profiling, I identified three major cost centers:

  1. Lock Contention: Threads spending 60-70% of time waiting for locks
  2. Allocation Pressure: 15 MB/sec of queue nodes triggering frequent GC
  3. Cache Inefficiency: Random access patterns causing constant cache misses

Any solution must address all three. Fixing one while ignoring the others gives marginal gains. We need a fundamentally different approach.


Part 3: Enter the Disruptor Pattern

The Disruptor pattern, pioneered by LMAX Exchange for their foreign exchange trading platform, provides exactly what we need: a lock-free, allocation-free, cache-friendly mechanism for passing events between stages.

The Core Insight

The key insight of the Disruptor is beautifully simple: instead of multiple queues between stages, use a single, pre-allocated ring buffer shared by all stages. Each stage processes events at its own pace, tracked by sequence numbers.

Loading diagram...

In this diagram, green (0-1) marks slots fully processed by all stages, yellow (2-3) shows slots processed by Stage 1 and currently being processed by Stage 2, orange (4-5) indicates slots processed by Stage 1 but waiting for Stage 2, and red (6-7) represents slots being published that are not yet available for Stage 1.

Zero Allocation After Initialization

The ring buffer is pre-allocated at startup. Event objects are created once and reused forever. When a publisher wants to send an event, it:

  1. Claims a slot (atomic increment of sequence)
  2. Populates the pre-existing event object in that slot
  3. Publishes (makes the slot available to consumers)

No new keyword in the hot path. No garbage. No GC pauses from your event pipeline.

Natural Backpressure

The publisher tracks the slowest consumer. If the slowest consumer is N slots behind, the publisher has N slots of buffer space. If it catches up to the slowest consumer, it must wait - natural backpressure without explicit flow control.

This is fundamentally different from unbounded queues that can grow until you OOM, or bounded blocking queues where producers block and cause unpredictable latency spikes.

Cache-Friendly Sequential Access

Events are processed in sequence order. Stage 1 processes slot 0, then slot 1, then slot 2. Stage 2 follows the same pattern, perhaps a few slots behind. This sequential access pattern is perfect for CPU prefetchers - they can load the next cache line before you need it.

Contrast this with linked lists where each node can be anywhere in memory, causing cache misses on every traversal.


Part 4: Disruptor Architecture Deep Dive

Let's break down the components of a Disruptor-style pipeline:

Component 1: The Ring Buffer

public class RingBuffer<E> {
 
    private final E[] entries;
    private final int bufferSize;
    private final int indexMask;
 
    // Padded sequence to prevent false sharing
    private final PaddedAtomicLong cursor = new PaddedAtomicLong(-1L);
 
    @SuppressWarnings("unchecked")
    public RingBuffer(EventFactory<E> factory, int bufferSize) {
        // Must be power of 2 for fast modulo via bitwise AND
        this.bufferSize = bufferSize;
        this.indexMask = bufferSize - 1;
        this.entries = (E[]) new Object[bufferSize];
 
        // Pre-allocate all event objects
        for (int i = 0; i < bufferSize; i++) {
            entries[i] = factory.newInstance();
        }
    }
 
    public E get(long sequence) {
        return entries[(int) (sequence & indexMask)];
    }
 
    public long getCursor() {
        return cursor.get();
    }
}

The ring buffer owns all event objects. They're created at initialization and never garbage collected (as long as the ring buffer lives). The indexMask trick (sequence & (size - 1)) gives us fast modulo for power-of-2 sizes.

Component 2: Sequences and Sequence Barriers

Each processing stage tracks its progress with a Sequence:

public class Sequence {
 
    // Padding to prevent false sharing
    private long p1, p2, p3, p4, p5, p6, p7;
 
    private volatile long value;
 
    private long p8, p9, p10, p11, p12, p13, p14;
 
    public Sequence(long initialValue) {
        this.value = initialValue;
    }
 
    public long get() {
        return value;
    }
 
    public void set(long value) {
        this.value = value;
    }
 
    public boolean compareAndSet(long expected, long update) {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expected, update);
    }
}

The massive padding ensures each sequence occupies its own cache line. Without this, updating one sequence would invalidate others in other cores' caches - false sharing is the silent performance killer.

A SequenceBarrier lets a stage wait for upstream stages to make progress:

public class SequenceBarrier {
 
    private final Sequence cursorSequence;
    private final Sequence[] dependentSequences;
    private final WaitStrategy waitStrategy;
 
    public long waitFor(long sequence) throws InterruptedException {
        long availableSequence;
 
        // Wait until cursor reaches our target
        while ((availableSequence = cursorSequence.get()) < sequence) {
            waitStrategy.wait(sequence, cursorSequence);
        }
 
        // Now wait for all dependent sequences (upstream stages)
        if (dependentSequences.length > 0) {
            while ((availableSequence = getMinimumSequence(dependentSequences)) < sequence) {
                waitStrategy.wait(sequence, dependentSequences[0]);
            }
        }
 
        return availableSequence;
    }
 
    private long getMinimumSequence(Sequence[] sequences) {
        long minimum = Long.MAX_VALUE;
        for (Sequence sequence : sequences) {
            minimum = Math.min(minimum, sequence.get());
        }
        return minimum;
    }
}

Component 3: Event Processors

Each stage runs one or more EventProcessor instances:

public abstract class EventProcessor implements Runnable {
 
    private final RingBuffer<?> ringBuffer;
    private final SequenceBarrier barrier;
    private final Sequence sequence = new Sequence(-1L);
 
    @Override
    public void run() {
        long nextSequence = sequence.get() + 1L;
 
        while (running) {
            try {
                // Wait for events to be available
                long availableSequence = barrier.waitFor(nextSequence);
 
                // Process all available events
                while (nextSequence <= availableSequence) {
                    Event event = ringBuffer.get(nextSequence);
                    onEvent(event, nextSequence);
                    nextSequence++;
                }
 
                // Update our sequence (publish our progress)
                sequence.set(availableSequence);
 
            } catch (InterruptedException e) {
                // Handle shutdown
            }
        }
    }
 
    protected abstract void onEvent(Event event, long sequence);
}

Component 4: Wait Strategies

The wait strategy determines how processors wait for events when the ring buffer is empty:

public interface WaitStrategy {
    long waitFor(long sequence, Sequence cursorSequence)
        throws InterruptedException;
}
 
// Maximum performance, maximum CPU usage
public class BusySpinWaitStrategy implements WaitStrategy {
    @Override
    public long waitFor(long sequence, Sequence cursor) {
        while (cursor.get() < sequence) {
            Thread.onSpinWait();  // CPU hint for spin-waiting
        }
        return cursor.get();
    }
}
 
// Lower CPU usage, slightly higher latency
public class YieldingWaitStrategy implements WaitStrategy {
    @Override
    public long waitFor(long sequence, Sequence cursor) {
        int counter = 100;
        while (cursor.get() < sequence) {
            if (counter > 0) {
                counter--;
                Thread.onSpinWait();
            } else {
                Thread.yield();
            }
        }
        return cursor.get();
    }
}
 
// Lowest CPU usage, highest latency
public class BlockingWaitStrategy implements WaitStrategy {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
 
    @Override
    public long waitFor(long sequence, Sequence cursor)
        throws InterruptedException {
        if (cursor.get() < sequence) {
            lock.lock();
            try {
                while (cursor.get() < sequence) {
                    condition.await();
                }
            } finally {
                lock.unlock();
            }
        }
        return cursor.get();
    }
}

For ultra-low latency, use BusySpinWaitStrategy. Your CPU will run hot, but latency will be minimal. For background processing where some latency is acceptable, YieldingWaitStrategy or BlockingWaitStrategy save power.

Architecture Diagram

Loading diagram...

Part 5: Implementation - Building a Disruptor-Style Pipeline

Let's build a complete implementation from scratch. I'll show you both the naive blocking approach and the Disruptor-style approach so you can see the difference.

The Event Object

First, we need a reusable event object:

/**
 * Pre-allocated event object for the ring buffer.
 * Mutable to avoid allocation in the hot path.
 */
public class PipelineEvent {
 
    // Event data - reset and reused for each event
    private long timestamp;
    private int eventType;
    private long sourceId;
    private byte[] payload;
 
    // Processing state - tracks which stages have processed this event
    private volatile int processedStages;
 
    // Pre-allocate payload buffer
    public PipelineEvent() {
        this.payload = new byte[1024];  // Max payload size
    }
 
    public void reset() {
        this.timestamp = 0;
        this.eventType = 0;
        this.sourceId = 0;
        this.processedStages = 0;
        // Note: we don't reallocate payload, just overwrite
    }
 
    // Getters and setters omitted for brevity
 
    public void setData(long timestamp, int eventType, long sourceId,
                       byte[] data, int offset, int length) {
        this.timestamp = timestamp;
        this.eventType = eventType;
        this.sourceId = sourceId;
        System.arraycopy(data, offset, this.payload, 0, length);
    }
}

The Naive Blocking Implementation

Here's what we're replacing - a traditional blocking queue pipeline:

View source

/**
 * Traditional blocking pipeline using LinkedBlockingQueue.
 * Simple but becomes a bottleneck under high load.
 */
public class BlockingEventPipeline {
 
    private final BlockingQueue<Event> stage1Queue;
    private final BlockingQueue<Event> stage2Queue;
    private final BlockingQueue<Event> stage3Queue;
 
    private final ExecutorService stage1Executor;
    private final ExecutorService stage2Executor;
    private final ExecutorService stage3Executor;
 
    public BlockingEventPipeline(int queueCapacity) {
        // Each stage has its own queue
        this.stage1Queue = new LinkedBlockingQueue<>(queueCapacity);
        this.stage2Queue = new LinkedBlockingQueue<>(queueCapacity);
        this.stage3Queue = new LinkedBlockingQueue<>(queueCapacity);
 
        // Worker threads for each stage
        this.stage1Executor = Executors.newFixedThreadPool(4);
        this.stage2Executor = Executors.newFixedThreadPool(4);
        this.stage3Executor = Executors.newFixedThreadPool(2);
    }
 
    public void start() {
        // Stage 1 workers: Parse events
        for (int i = 0; i < 4; i++) {
            stage1Executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Event event = stage1Queue.take();  // BLOCKS!
                        parseEvent(event);
                        stage2Queue.put(event);            // MAY BLOCK!
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
 
        // Stage 2 workers: Enrich events
        for (int i = 0; i < 4; i++) {
            stage2Executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Event event = stage2Queue.take();  // BLOCKS!
                        enrichEvent(event);
                        stage3Queue.put(event);            // MAY BLOCK!
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
 
        // Stage 3 workers: Aggregate events
        for (int i = 0; i < 2; i++) {
            stage3Executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Event event = stage3Queue.take();  // BLOCKS!
                        aggregateEvent(event);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            });
        }
    }
 
    public void submit(Event event) throws InterruptedException {
        stage1Queue.put(event);  // Entry point - may block if queue full
    }
 
    private void parseEvent(Event event) {
        // Simulate parsing work
        event.setParsed(true);
    }
 
    private void enrichEvent(Event event) {
        // Simulate enrichment work
        event.setEnriched(true);
    }
 
    private void aggregateEvent(Event event) {
        // Simulate aggregation work
        event.setAggregated(true);
    }
}

The problems with this approach:

  1. Three separate queues = three allocation sites, three contention points
  2. Blocking take/put = threads park and unpark, causing context switches
  3. New Event for each submission = constant allocation pressure
  4. No coordination between stages = each stage is independent

The Disruptor-Style Implementation

Now let's build the Disruptor-style version:

View source

/**
 * Disruptor-style event pipeline.
 * Single pre-allocated ring buffer shared by all stages.
 * Lock-free, allocation-free in the hot path.
 */
public class DisruptorStylePipeline {
 
    // Ring buffer configuration
    private static final int BUFFER_SIZE = 1024;  // Must be power of 2
    private static final int INDEX_MASK = BUFFER_SIZE - 1;
 
    // Pre-allocated event buffer
    private final PipelineEvent[] ringBuffer;
 
    // Sequence tracking with cache line padding
    private final PaddedAtomicLong publisherSequence;
    private final PaddedAtomicLong stage1Sequence;
    private final PaddedAtomicLong stage2Sequence;
    private final PaddedAtomicLong stage3Sequence;
 
    // Per-slot availability flags for multi-producer support
    private final int[] availableFlags;
 
    // Worker threads
    private final Thread[] stage1Workers;
    private final Thread[] stage2Workers;
    private final Thread[] stage3Workers;
 
    private volatile boolean running = true;
 
    public DisruptorStylePipeline() {
        // Pre-allocate everything
        this.ringBuffer = new PipelineEvent[BUFFER_SIZE];
        for (int i = 0; i < BUFFER_SIZE; i++) {
            this.ringBuffer[i] = new PipelineEvent();
        }
 
        // Initialize sequences to -1 (nothing published yet)
        this.publisherSequence = new PaddedAtomicLong(-1L);
        this.stage1Sequence = new PaddedAtomicLong(-1L);
        this.stage2Sequence = new PaddedAtomicLong(-1L);
        this.stage3Sequence = new PaddedAtomicLong(-1L);
 
        // Available flags for multi-producer coordination
        this.availableFlags = new int[BUFFER_SIZE];
        Arrays.fill(availableFlags, -1);
 
        // Create worker threads (but don't start yet)
        this.stage1Workers = new Thread[4];
        this.stage2Workers = new Thread[4];
        this.stage3Workers = new Thread[2];
    }
 
    public void start() {
        // Stage 1: Wait on publisher, process, update stage1Sequence
        for (int i = 0; i < stage1Workers.length; i++) {
            final int workerId = i;
            stage1Workers[i] = new Thread(() -> runStage1Worker(workerId));
            stage1Workers[i].setName("Stage1-Worker-" + i);
            stage1Workers[i].start();
        }
 
        // Stage 2: Wait on stage1, process, update stage2Sequence
        for (int i = 0; i < stage2Workers.length; i++) {
            final int workerId = i;
            stage2Workers[i] = new Thread(() -> runStage2Worker(workerId));
            stage2Workers[i].setName("Stage2-Worker-" + i);
            stage2Workers[i].start();
        }
 
        // Stage 3: Wait on stage2, process, update stage3Sequence
        for (int i = 0; i < stage3Workers.length; i++) {
            final int workerId = i;
            stage3Workers[i] = new Thread(() -> runStage3Worker(workerId));
            stage3Workers[i].setName("Stage3-Worker-" + i);
            stage3Workers[i].start();
        }
    }
 
    /**
     * Publish an event to the pipeline.
     * Returns the sequence number for tracking.
     */
    public long publish(long timestamp, int eventType, long sourceId,
                       byte[] data, int offset, int length) {
        // Step 1: Claim a slot (atomic increment)
        long sequence = claimNext();
 
        // Step 2: Wait for slot to be available (slowest consumer caught up)
        waitForSlotAvailable(sequence);
 
        // Step 3: Write event data to the pre-allocated slot
        int index = (int) (sequence & INDEX_MASK);
        PipelineEvent event = ringBuffer[index];
        event.reset();
        event.setData(timestamp, eventType, sourceId, data, offset, length);
 
        // Step 4: Publish (make visible to consumers)
        publish(sequence);
 
        return sequence;
    }
 
    private long claimNext() {
        // Multi-producer safe: atomic increment
        return PUBLISHER_SEQUENCE.getAndIncrement(this) + 1;
    }
 
    private void waitForSlotAvailable(long sequence) {
        // Wait until the slowest consumer has processed this slot
        // in the previous cycle
        long wrapPoint = sequence - BUFFER_SIZE;
 
        while (stage3Sequence.get() < wrapPoint) {
            Thread.onSpinWait();  // Busy spin for lowest latency
        }
    }
 
    private void publish(long sequence) {
        // Mark this slot as available for consumers
        int index = (int) (sequence & INDEX_MASK);
        AVAILABLE_FLAGS.setRelease(availableFlags, index, (int) (sequence >>> 32));
    }
 
    private boolean isAvailable(long sequence) {
        int index = (int) (sequence & INDEX_MASK);
        int flag = (int) AVAILABLE_FLAGS.getAcquire(availableFlags, index);
        return flag == (int) (sequence >>> 32);
    }
 
    // Worker implementations...
}

Stage Worker Implementation

Each stage worker follows the same pattern:

private void runStage1Worker(int workerId) {
    // Each worker tracks its own position
    long nextSequence = workerId;  // Workers interleave slots
    int numWorkers = stage1Workers.length;
 
    while (running) {
        // Wait for this sequence to be published
        while (!isAvailable(nextSequence)) {
            if (!running) return;
            Thread.onSpinWait();
        }
 
        // Process the event
        int index = (int) (nextSequence & INDEX_MASK);
        PipelineEvent event = ringBuffer[index];
        processStage1(event);
 
        // Update stage1 sequence (coordinate with other stage1 workers)
        updateStage1Sequence(nextSequence);
 
        // Move to next slot for this worker
        nextSequence += numWorkers;
    }
}
 
private void processStage1(PipelineEvent event) {
    // Parse the event
    // In real code, this would decode the payload, validate fields, etc.
    event.markStage1Complete();
}
 
private void updateStage1Sequence(long completedSequence) {
    // For single-threaded stage, simple set is enough
    // For multi-threaded stage, we need coordination
    long expected;
    do {
        expected = stage1Sequence.get();
        if (completedSequence <= expected) {
            return;  // Already updated by another worker
        }
    } while (!stage1Sequence.compareAndSet(expected, completedSequence));
}

The Complete Pipeline Visualization

Loading diagram...

Part 6: Memory Layout and Cache Optimization

The Disruptor pattern's performance comes largely from careful attention to memory layout. Let's dive into the details.

False Sharing Prevention

Modern CPUs load memory in cache lines, typically 64 bytes. If two variables share a cache line and are accessed by different cores, every write to one invalidates the other in all cores' caches - even if they're logically independent.

Our sequence variables are classic false sharing candidates:

// BAD: All sequences on same cache line
private volatile long publisherSequence;
private volatile long stage1Sequence;
private volatile long stage2Sequence;
private volatile long stage3Sequence;
// These fit in one 64-byte cache line!

Every time the publisher updates publisherSequence, it invalidates stage1Sequence in Stage 1's cache. Stage 1 must re-fetch it from main memory - a 40-100ns penalty on every iteration.

Solution: Pad each sequence to its own cache line:

/**
 * Padded atomic long that occupies exactly one cache line.
 * Prevents false sharing between sequences.
 */
public class PaddedAtomicLong {
 
    // 7 longs = 56 bytes of padding before value
    private long p1, p2, p3, p4, p5, p6, p7;
 
    private volatile long value;
 
    // 7 longs = 56 bytes of padding after value
    private long p8, p9, p10, p11, p12, p13, p14;
 
    public PaddedAtomicLong(long initialValue) {
        this.value = initialValue;
    }
 
    public long get() {
        return value;
    }
 
    public void set(long newValue) {
        this.value = newValue;
    }
 
    public long getAndIncrement() {
        return UNSAFE.getAndAddLong(this, VALUE_OFFSET, 1L);
    }
 
    public boolean compareAndSet(long expected, long update) {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expected, update);
    }
 
    // VarHandle or Unsafe setup for atomic operations
    private static final long VALUE_OFFSET;
    static {
        try {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(
                PaddedAtomicLong.class.getDeclaredField("value")
            );
        } catch (NoSuchFieldException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}

Now each sequence has its own cache line. Updating publisherSequence doesn't affect stage1Sequence at all.

Ring Buffer Memory Layout

The ring buffer array itself benefits from sequential layout:

// Events are contiguous in memory
PipelineEvent[] ringBuffer = new PipelineEvent[BUFFER_SIZE];
 
// Access pattern is sequential
event = ringBuffer[0];  // Cache line loaded
event = ringBuffer[1];  // Same cache line! No fetch needed
event = ringBuffer[2];  // Same cache line!
// ...
event = ringBuffer[8];  // New cache line (8 references = 64 bytes)

The CPU prefetcher recognizes this sequential pattern and proactively loads upcoming cache lines. By the time you access ringBuffer[8], it's already in L1 cache.

Compare this to a linked list:

// Linked list nodes are scattered in memory
Node node0 = new Node();  // Address: 0x1000
Node node1 = new Node();  // Address: 0x5000 (who knows?)
Node node2 = new Node();  // Address: 0x2000 (random location)
 
// Access pattern is random
node = node0;                  // Cache miss
node = node.next;              // Cache miss (different cache line)
node = node.next;              // Cache miss (different cache line)

Every linked list traversal is a cache miss. No prefetcher can predict where the next node will be.

Event Object Layout

Inside each event object, we also consider layout:

public class PipelineEvent {
 
    // Hot fields first - accessed on every event
    private long timestamp;        // 8 bytes
    private int eventType;         // 4 bytes
    private long sourceId;         // 8 bytes
    private int payloadLength;     // 4 bytes
    // Total: 24 bytes - fits in first cache line with header
 
    // Cold fields - only accessed sometimes
    private volatile int processedStages;  // 4 bytes
 
    // Large fields last - may span cache lines
    private byte[] payload;        // Reference (8 bytes) + array elsewhere
}

By placing frequently accessed fields first, we maximize the chance that a single cache line fetch gives us everything we need.

NUMA Considerations

On multi-socket systems (common in servers), memory is not uniform - each CPU socket has "local" memory that's faster to access than "remote" memory on other sockets.

For maximum performance, pin your pipeline threads to cores on the same socket:

# Pin all pipeline threads to cores 0-7 (socket 0)
taskset -c 0-7 java -jar pipeline.jar
 
# Or use numactl
numactl --cpunodebind=0 --membind=0 java -jar pipeline.jar

And allocate your ring buffer on startup, before any work happens:

public class DisruptorStylePipeline {
 
    // Allocate ring buffer in constructor (before threads start)
    // This ensures the array is allocated on the main thread's local memory
    public DisruptorStylePipeline() {
        this.ringBuffer = new PipelineEvent[BUFFER_SIZE];
        // Force allocation of all events now
        for (int i = 0; i < BUFFER_SIZE; i++) {
            this.ringBuffer[i] = new PipelineEvent();
        }
    }
}

Part 7: Wait Strategies and CPU Utilization

The wait strategy is the heart of the latency vs. CPU trade-off. Let's explore the options in detail.

Busy Spin: Lowest Latency

public class BusySpinWaitStrategy implements WaitStrategy {
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) {
        long availableSequence;
 
        while ((availableSequence = dependentSequence.get()) < sequence) {
            Thread.onSpinWait();
        }
 
        return availableSequence;
    }
}

Characteristics:

  • Latency: ~10-50ns response time
  • CPU: 100% utilization while waiting
  • Power: Maximum consumption
  • Use when: Ultra-low latency is critical, dedicated cores available

The Thread.onSpinWait() hint (Java 9+) tells the CPU we're in a spin loop. On x86-64, this compiles to PAUSE, which:

  • Reduces power consumption slightly
  • Improves performance on hyper-threaded cores
  • Prevents memory ordering violations from speculative loads

Yielding: Balanced Approach

public class YieldingWaitStrategy implements WaitStrategy {
 
    private static final int SPIN_TRIES = 100;
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) {
        long availableSequence;
        int counter = SPIN_TRIES;
 
        while ((availableSequence = dependentSequence.get()) < sequence) {
            if (counter > 0) {
                counter--;
                Thread.onSpinWait();
            } else {
                counter = SPIN_TRIES;
                Thread.yield();
            }
        }
 
        return availableSequence;
    }
}

Characteristics:

  • Latency: ~1-10us response time
  • CPU: High but not 100%
  • Power: Moderate
  • Use when: Low latency needed but not at any cost

This strategy spins for a bit, then yields to let other threads run. Good for systems where pipeline threads share cores with other work.

Sleeping: CPU Friendly

public class SleepingWaitStrategy implements WaitStrategy {
 
    private static final int RETRIES = 200;
    private static final long SLEEP_NS = 100;
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) throws InterruptedException {
        long availableSequence;
        int counter = RETRIES;
 
        while ((availableSequence = dependentSequence.get()) < sequence) {
            if (counter > 100) {
                counter--;
            } else if (counter > 0) {
                counter--;
                Thread.yield();
            } else {
                LockSupport.parkNanos(SLEEP_NS);
            }
        }
 
        return availableSequence;
    }
}

Characteristics:

  • Latency: ~100us-1ms response time
  • CPU: Low utilization while waiting
  • Power: Minimal
  • Use when: Latency tolerance is high, power/CPU cost matters

Blocking: Maximum Efficiency

public class BlockingWaitStrategy implements WaitStrategy {
 
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();
 
    @Override
    public long waitFor(long sequence, Sequence dependentSequence,
                       Sequence cursorSequence) throws InterruptedException {
        long availableSequence;
 
        if ((availableSequence = dependentSequence.get()) < sequence) {
            lock.lock();
            try {
                while ((availableSequence = dependentSequence.get()) < sequence) {
                    processorNotifyCondition.await();
                }
            } finally {
                lock.unlock();
            }
        }
 
        return availableSequence;
    }
 
    public void signalAllWhenBlocking() {
        lock.lock();
        try {
            processorNotifyCondition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

Characteristics:

  • Latency: ~10us-100us+ (context switch dependent)
  • CPU: Minimal utilization while waiting
  • Power: Minimal
  • Use when: Throughput matters more than latency

Choosing the Right Strategy

Loading diagram...

Part 8: Multi-Producer Coordination

When multiple threads publish to the same ring buffer, we need careful coordination to avoid race conditions.

The Claim-Write-Publish Pattern

public class MultiProducerSequencer {
 
    private final int bufferSize;
    private final int indexMask;
 
    // Cursor tracks the highest claimed sequence
    private final PaddedAtomicLong cursor = new PaddedAtomicLong(-1L);
 
    // Gating sequences track the slowest consumers
    private final Sequence[] gatingSequences;
 
    // Per-slot flags track which sequences are fully published
    private final int[] availableBuffer;
 
    public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) {
        this.bufferSize = bufferSize;
        this.indexMask = bufferSize - 1;
        this.gatingSequences = gatingSequences;
        this.availableBuffer = new int[bufferSize];
 
        // Initialize available buffer
        for (int i = 0; i < bufferSize; i++) {
            availableBuffer[i] = -1;
        }
    }
 
    /**
     * Claim the next sequence for publishing.
     * Multiple threads may claim concurrently.
     */
    public long next() {
        long current;
        long next;
 
        do {
            current = cursor.get();
            next = current + 1;
 
            // Check if we'd wrap past the slowest consumer
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = getMinimumSequence(gatingSequences);
 
            if (wrapPoint > cachedGatingSequence) {
                // Must wait for consumers to catch up
                LockSupport.parkNanos(1);
                continue;
            }
 
        } while (!cursor.compareAndSet(current, next));
 
        return next;
    }
 
    /**
     * Publish a sequence, making it visible to consumers.
     * Must be called after writing to the slot.
     */
    public void publish(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = (int) (sequence >>> indexShift);
 
        AVAILABLE_BUFFER.setRelease(availableBuffer, index, flag);
    }
 
    /**
     * Check if a sequence is available for consumption.
     */
    public boolean isAvailable(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = (int) (sequence >>> indexShift);
 
        return (int) AVAILABLE_BUFFER.getAcquire(availableBuffer, index) == flag;
    }
 
    /**
     * Get the highest available sequence for a consumer.
     * Handles gaps from incomplete publications.
     */
    public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
        for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
            if (!isAvailable(sequence)) {
                return sequence - 1;
            }
        }
        return availableSequence;
    }
}

Understanding the Available Buffer

The available buffer is the key to multi-producer coordination. Here's how it works:

Loading diagram...

The available buffer prevents consumers from seeing partially-written events, even when producers complete out of order.

VarHandle and Memory Ordering in Depth

Understanding memory ordering is critical for Disruptor correctness. Let me walk through the VarHandle operations we use and why they matter.

public class DisruptorSequencer {
 
    private static final VarHandle CURSOR;
    private static final VarHandle AVAILABLE_BUFFER;
 
    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            CURSOR = lookup.findVarHandle(
                DisruptorSequencer.class,
                "cursor",
                long.class
            );
            AVAILABLE_BUFFER = MethodHandles.arrayElementVarHandle(int[].class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
 
    private volatile long cursor = -1L;
    private final int[] availableBuffer;
 
    /**
     * Claim the next sequence using CAS with volatile semantics.
     * Full memory fence ensures all prior writes are visible.
     */
    public long next() {
        long current;
        long next;
        do {
            current = (long) CURSOR.getVolatile(this);  // Acquire semantics
            next = current + 1;
        } while (!CURSOR.compareAndSet(this, current, next));  // Full fence
        return next;
    }
 
    /**
     * Publish with release semantics.
     * All prior writes (to the event) happen-before this write.
     */
    public void publish(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = calculateAvailabilityFlag(sequence);
 
        // Release ensures event data is visible before flag update
        AVAILABLE_BUFFER.setRelease(availableBuffer, index, flag);
    }
 
    /**
     * Check availability with acquire semantics.
     * If we see the flag, we're guaranteed to see the event data.
     */
    public boolean isAvailable(long sequence) {
        int index = (int) (sequence & indexMask);
        int flag = calculateAvailabilityFlag(sequence);
 
        // Acquire ensures we see event data if we see the flag
        return (int) AVAILABLE_BUFFER.getAcquire(availableBuffer, index) == flag;
    }
}

Memory Access Modes Explained:

  1. Plain (getPlain/setPlain): No ordering guarantees. Fastest but dangerous for concurrent access. Only use when you don't care about visibility to other threads.

  2. Opaque (getOpaque/setOpaque): Guarantees the read/write happens, but no ordering with other operations. Useful for progress flags that don't need synchronization.

  3. Acquire (getAcquire): All reads and writes after this load cannot be reordered before it. Use for reading synchronization variables before reading protected data.

  4. Release (setRelease): All reads and writes before this store cannot be reordered after it. Use for writing synchronization variables after writing protected data.

  5. Volatile (getVolatile/setVolatile): Both acquire and release semantics plus total ordering with other volatile operations. Most expensive but easiest to reason about.

The Happens-Before Relationship:

Producer:                      Consumer:
  write event data               |
       |                         |
  setRelease(flag)     -->     getAcquire(flag)
       |                         |
                              read event data

The release-acquire pairing creates a happens-before relationship: if the consumer sees the flag update, it's guaranteed to see all writes that happened before the release (including the event data).

Batch Publishing for Throughput

When publishing many events, batching improves throughput:

/**
 * Batch publish for higher throughput.
 * Claims multiple slots at once, reducing CAS operations.
 */
public long[] nextBatch(int batchSize) {
    long current;
    long next;
 
    do {
        current = cursor.get();
        next = current + batchSize;
 
        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = getMinimumSequence(gatingSequences);
 
        if (wrapPoint > cachedGatingSequence) {
            LockSupport.parkNanos(1);
            continue;
        }
 
    } while (!cursor.compareAndSet(current, next));
 
    // Return all claimed sequences
    long[] sequences = new long[batchSize];
    for (int i = 0; i < batchSize; i++) {
        sequences[i] = current + 1 + i;
    }
    return sequences;
}
 
/**
 * Batch publish all sequences at once.
 */
public void publishBatch(long from, long to) {
    for (long seq = from; seq <= to; seq++) {
        publish(seq);
    }
}

Part 9: Benchmarks and Results

Let's see how our Disruptor-style pipeline compares to the blocking approach.

Benchmark Setup

@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 2)
@Fork(3)
public class PipelineBenchmark {
 
    private BlockingEventPipeline blockingPipeline;
    private DisruptorStylePipeline disruptorPipeline;
 
    @Setup
    public void setup() {
        blockingPipeline = new BlockingEventPipeline(1024);
        blockingPipeline.start();
 
        disruptorPipeline = new DisruptorStylePipeline();
        disruptorPipeline.start();
    }
 
    @Benchmark
    @Group("blocking")
    @GroupThreads(4)
    public void blockingPublish(Blackhole bh) {
        byte[] data = new byte[100];
        blockingPipeline.submit(System.nanoTime(), 1, 1234L, data, 0, 100);
    }
 
    @Benchmark
    @Group("disruptor")
    @GroupThreads(4)
    public void disruptorPublish(Blackhole bh) {
        byte[] data = new byte[100];
        disruptorPipeline.publish(System.nanoTime(), 1, 1234L, data, 0, 100);
    }
}

Latency Results

Benchmark                                Mode  Cnt      Score      Error  Units

BlockingPipeline.publish                sample  1000   2340.23 +/-  234.12  ns/op
BlockingPipeline.publish:p50            sample         1890.00             ns/op
BlockingPipeline.publish:p90            sample         3450.00             ns/op
BlockingPipeline.publish:p99            sample         8934.00             ns/op
BlockingPipeline.publish:p99.9          sample        45123.00             ns/op

DisruptorPipeline.publish               sample  1000     98.34 +/-    8.12  ns/op
DisruptorPipeline.publish:p50           sample          78.00             ns/op
DisruptorPipeline.publish:p90           sample         134.00             ns/op
DisruptorPipeline.publish:p99           sample         287.00             ns/op
DisruptorPipeline.publish:p99.9         sample         623.00             ns/op

Analysis:

MetricBlockingDisruptorImprovement
Mean2,340ns98ns23.9x
p501,890ns78ns24.2x
p903,450ns134ns25.7x
p998,934ns287ns31.1x
p99.945,123ns623ns72.4x

The improvement is dramatic across all percentiles. The tail latency improvement (p99.9 at 72x) is particularly important - this is where blocking queues cause the most damage.

Throughput Results

Benchmark                                Mode  Cnt        Score        Error  Units

BlockingPipeline.throughput              thrpt   10    47823.23 +/-  2341.12  ops/s
DisruptorPipeline.throughput             thrpt   10   523456.78 +/- 12345.67  ops/s

Throughput Improvement: 10.9x

Garbage Collection Analysis

During a 5-minute sustained load test with 4 publishers:

Blocking Pipeline:

  • Young GC events: 234
  • Total GC pause time: 4,567ms
  • Average pause: 19.5ms
  • Max pause: 127ms
  • Allocation rate: 23.4 MB/sec

Disruptor Pipeline:

  • Young GC events: 3
  • Total GC pause time: 45ms
  • Average pause: 15ms
  • Max pause: 18ms
  • Allocation rate: 0.1 MB/sec

GC Impact Reduction: 98x fewer events, 100x less pause time

The near-zero allocation rate of the Disruptor pipeline means the garbage collector has almost nothing to do. Those three GC events were likely triggered by monitoring code, not the pipeline itself.

Per-Stage Latency Breakdown

Stage Latency Comparison (nanoseconds):

Blocking Pipeline:
  Queue 1 transfer: 1,200 - 2,500ns
  Stage 1 process:    100 - 300ns
  Queue 2 transfer: 1,100 - 2,200ns
  Stage 2 process:    150 - 400ns
  Queue 3 transfer:   900 - 1,800ns
  Stage 3 process:    100 - 250ns
  Total:            3,550 - 7,450ns

Disruptor Pipeline:
  Slot claim:          20 - 40ns
  Stage 1 wait:        10 - 30ns
  Stage 1 process:    100 - 300ns
  Stage 2 wait:        10 - 25ns
  Stage 2 process:    150 - 400ns
  Stage 3 wait:        10 - 20ns
  Stage 3 process:    100 - 250ns
  Total:              400 - 1,065ns

The blocking queue transfers dominate the blocking pipeline's latency. In the Disruptor pipeline, inter-stage communication is nearly free.


Part 10: Production Deployment Considerations

Moving from benchmarks to production requires attention to several operational concerns.

Monitoring and Observability

Add metrics to track pipeline health:

public class MonitoredPipeline extends DisruptorStylePipeline {
 
    private final LongAdder publishCount = new LongAdder();
    private final LongAdder stage1Count = new LongAdder();
    private final LongAdder stage2Count = new LongAdder();
    private final LongAdder stage3Count = new LongAdder();
 
    private final AtomicLong lastPublishLatency = new AtomicLong();
    private final AtomicLong lastE2ELatency = new AtomicLong();
 
    @Override
    public long publish(long timestamp, int eventType, long sourceId,
                       byte[] data, int offset, int length) {
        long startNanos = System.nanoTime();
        long sequence = super.publish(timestamp, eventType, sourceId,
                                       data, offset, length);
        lastPublishLatency.set(System.nanoTime() - startNanos);
        publishCount.increment();
        return sequence;
    }
 
    // Export via JMX or metrics framework
    public long getPublishCount() { return publishCount.sum(); }
    public long getPublishLatencyNanos() { return lastPublishLatency.get(); }
 
    public long getBufferUtilization() {
        long head = publisherSequence.get();
        long tail = stage3Sequence.get();
        return head - tail;
    }
 
    public double getBufferUtilizationPercent() {
        return (double) getBufferUtilization() / BUFFER_SIZE * 100;
    }
}

Backpressure Handling

When the pipeline is overloaded, you have several options:

public enum BackpressureStrategy {
    BLOCK,      // Wait until space available
    DROP_NEW,   // Discard new events if full
    DROP_OLD,   // Overwrite oldest events
    SIGNAL      // Signal upstream to slow down
}
 
public class BackpressureAwarePipeline extends DisruptorStylePipeline {
 
    private final BackpressureStrategy strategy;
    private final LongAdder droppedEvents = new LongAdder();
 
    public boolean tryPublish(long timestamp, int eventType, long sourceId,
                             byte[] data, int offset, int length,
                             long timeoutNanos) {
        switch (strategy) {
            case BLOCK:
                return publishWithTimeout(timestamp, eventType, sourceId,
                                          data, offset, length, timeoutNanos);
 
            case DROP_NEW:
                if (!hasCapacity()) {
                    droppedEvents.increment();
                    return false;
                }
                publish(timestamp, eventType, sourceId, data, offset, length);
                return true;
 
            case DROP_OLD:
                // Force publish, potentially overwriting unprocessed events
                forcePublish(timestamp, eventType, sourceId, data, offset, length);
                return true;
 
            case SIGNAL:
                if (!hasCapacity()) {
                    signalBackpressure();
                    return false;
                }
                publish(timestamp, eventType, sourceId, data, offset, length);
                return true;
 
            default:
                throw new IllegalStateException("Unknown strategy: " + strategy);
        }
    }
 
    private boolean hasCapacity() {
        long head = publisherSequence.get();
        long tail = stage3Sequence.get();
        return (head - tail) < (BUFFER_SIZE - 10);  // Leave some margin
    }
}

Graceful Shutdown

Shutting down a Disruptor pipeline requires draining in-flight events:

public void shutdown() throws InterruptedException {
    // Step 1: Stop accepting new events
    acceptingEvents = false;
 
    // Step 2: Wait for all published events to be processed
    long timeout = System.currentTimeMillis() + 30_000;
    while (stage3Sequence.get() < publisherSequence.get()) {
        if (System.currentTimeMillis() > timeout) {
            throw new TimeoutException("Pipeline drain timeout");
        }
        Thread.sleep(10);
    }
 
    // Step 3: Signal workers to stop
    running = false;
 
    // Step 4: Interrupt any blocking waits
    for (Thread worker : allWorkers()) {
        worker.interrupt();
    }
 
    // Step 5: Wait for workers to terminate
    for (Thread worker : allWorkers()) {
        worker.join(5000);
    }
}

Thread Affinity for Maximum Performance

For ultra-low latency, pin threads to specific CPU cores:

public class AffinityPipeline extends DisruptorStylePipeline {
 
    @Override
    public void start() {
        // Pin publisher threads to cores 0-3
        // Pin stage 1 workers to cores 4-7
        // Pin stage 2 workers to cores 8-11
        // Pin stage 3 workers to cores 12-13
 
        for (int i = 0; i < stage1Workers.length; i++) {
            int core = 4 + i;
            stage1Workers[i] = new Thread(() -> {
                setThreadAffinity(core);
                runStage1Worker(i);
            });
            stage1Workers[i].start();
        }
 
        // Similarly for other stages...
    }
 
    private void setThreadAffinity(int core) {
        // Use JNA or JNI to call sched_setaffinity
        // Or use a library like Java-Thread-Affinity
        Affinity.setAffinity(1L << core);
    }
}

JVM Tuning for Disruptor Workloads

The right JVM flags can make a significant difference in Disruptor performance:

# Recommended JVM flags for Disruptor workloads
java \
    # Memory sizing - pre-size to avoid runtime growth
    -Xms8g -Xmx8g \
    -XX:NewSize=6g -XX:MaxNewSize=6g \
 
    # GC selection - ZGC for lowest pause times
    -XX:+UseZGC \
    -XX:+ZGenerational \
 
    # Or G1 with tuning for low latency
    # -XX:+UseG1GC \
    # -XX:MaxGCPauseMillis=10 \
    # -XX:G1HeapRegionSize=16m \
 
    # Disable biased locking (unnecessary overhead)
    -XX:-UseBiasedLocking \
 
    # Enable large pages for TLB efficiency
    -XX:+UseLargePages \
    -XX:LargePageSizeInBytes=2m \
 
    # Compiler optimizations
    -XX:+UseStringDeduplication \
    -XX:+OptimizeStringConcat \
 
    # Intrinsics and vectorization
    -XX:+UseVectorCmov \
 
    # Thread stack size (reduce if many threads)
    -Xss256k \
 
    # JIT compilation settings
    -XX:CompileThreshold=1000 \
    -XX:+TieredCompilation \
 
    -jar pipeline.jar

Why these flags matter:

  1. Fixed heap size (-Xms8g -Xmx8g): Prevents runtime heap resizing, which can cause GC pauses.

  2. Large young generation (-XX:NewSize=6g): Most Disruptor allocations are short-lived. A large eden space reduces GC frequency.

  3. ZGC or G1: ZGC provides sub-millisecond pause times. G1 is a good alternative with more tuning options.

  4. Disabled biased locking (-XX:-UseBiasedLocking): Biased locking optimization adds overhead for lock-free code that doesn't use locks.

  5. Large pages (-XX:+UseLargePages): Reduces TLB misses for large ring buffers. Requires OS configuration.

Operating System Tuning

Beyond JVM flags, OS-level tuning helps:

# Linux: Disable CPU frequency scaling (use performance governor)
echo performance | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
 
# Linux: Disable transparent huge pages (THP) if using explicit large pages
echo never | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
 
# Linux: Set real-time scheduling priority for pipeline threads
sudo chrt -f 99 java -jar pipeline.jar
 
# Linux: Isolate CPUs from scheduler for dedicated pipeline use
# In /etc/default/grub: GRUB_CMDLINE_LINUX="isolcpus=4-15"
 
# Linux: Pin IRQ handlers away from pipeline cores
echo 0-3 > /proc/irq/*/smp_affinity_list

Warm-Up Strategies

The JIT compiler needs time to optimize hot paths. Warm up your pipeline before handling production traffic:

public class WarmUpPipeline extends DisruptorStylePipeline {
 
    public void warmUp(int iterations) {
        System.out.println("Starting warm-up phase...");
 
        byte[] warmUpData = new byte[100];
        long startTime = System.currentTimeMillis();
 
        // Run through enough iterations to trigger JIT compilation
        for (int i = 0; i < iterations; i++) {
            publish(System.nanoTime(), 1, i, warmUpData, 0, 100);
        }
 
        // Wait for all warm-up events to process
        waitForDrain();
 
        long elapsed = System.currentTimeMillis() - startTime;
        System.out.println("Warm-up complete: " + iterations + " events in " +
                           elapsed + "ms (" + (iterations * 1000 / elapsed) + " ops/sec)");
 
        // Force full GC to clear warm-up garbage
        System.gc();
        System.gc();
 
        // Reset metrics
        resetCounters();
 
        System.out.println("Pipeline ready for production traffic");
    }
 
    private void waitForDrain() {
        while (stage3Sequence.get() < publisherSequence.get()) {
            Thread.onSpinWait();
        }
    }
}

Recommended warm-up: at least 100,000 iterations, or until you see consistent latency numbers in monitoring.


Part 11: Trade-offs and When to Use

The Disruptor pattern is powerful but not universally applicable. Let's be clear about when it shines and when simpler approaches suffice.

When the Disruptor Pattern Excels

High-Frequency Trading Systems

Our original use case. When microseconds matter:

  • Market data processing pipelines
  • Order matching engines
  • Risk calculation streams
  • Arbitrage detection systems

Real-Time Analytics

Systems that must process events with consistent, low latency:

  • Fraud detection pipelines
  • Real-time recommendation engines
  • Live dashboard data feeds
  • Alerting and monitoring systems

Event-Driven Architectures

Complex event processing where events flow through multiple stages:

  • Event sourcing write-ahead logs
  • CQRS command processing
  • Saga orchestration
  • Workflow engines

Telemetry and Logging

High-volume data collection where blocking would affect application performance:

  • Distributed tracing spans
  • Metrics aggregation
  • Structured logging
  • Audit trails

When Simpler Approaches Are Better

Low-Throughput Systems

If you're processing fewer than 10,000 events/second, the complexity of the Disruptor pattern isn't justified. A simple BlockingQueue will work fine.

Variable-Size Events

The Disruptor works best with fixed-size events that can be pre-allocated. If your events vary significantly in size, you'll need complex memory management.

Unbounded Queues

The ring buffer has fixed capacity. If you truly need unbounded buffering, ConcurrentLinkedQueue or similar is simpler.

Teams Without Low-Level Experience

Disruptor-style code requires understanding of:

  • Memory ordering and barriers
  • Cache architecture and false sharing
  • Lock-free algorithm correctness
  • JVM memory model

If your team isn't comfortable with these concepts, the maintenance burden will outweigh the performance benefits.

Decision Framework

Loading diagram...

Part 12: Conclusion

That 3 AM incident taught me something fundamental about event processing systems: the transport between processing stages often matters more than the processing itself.

Our original pipeline spent more time coordinating between stages than actually processing events. Each LinkedBlockingQueue was a bottleneck of lock contention, memory allocation, and cache thrashing. The business logic was fast; the plumbing was slow.

The Disruptor pattern eliminates all three problems:

  1. Lock contention eliminated - Atomic sequence numbers replace locks. No thread ever blocks waiting for another.

  2. Allocation eliminated - Pre-allocated ring buffer and events mean zero garbage in the hot path. GC pauses drop from seconds to milliseconds.

  3. Cache efficiency maximized - Sequential access patterns, padded sequences, and contiguous memory make the CPU prefetcher happy.

The results speak for themselves:

MetricBefore (Blocking)After (Disruptor)Improvement
Mean Latency2,340ns98ns23.9x
p99.9 Latency45,123ns623ns72.4x
Throughput48k ops/s523k ops/s10.9x
GC Pauses4,567ms/5min45ms/5min101x

But the pattern isn't magic, and it's not free. The complexity of lock-free code makes it harder to write, debug, and maintain. Fixed capacity ring buffers have size limits, so you must plan for backpressure. The expertise required spans memory ordering, cache architecture, and JVM internals. And it's not always necessary — for many workloads, simple blocking queues are perfectly adequate.

The key insight from this journey: measure first, then optimize. We knew we had a latency problem because we measured it. We knew blocking queues were the bottleneck because we profiled it. We knew the Disruptor pattern would help because we benchmarked it.

Performance optimization is about understanding where time goes and making informed trade-offs. The Disruptor pattern trades simplicity for speed. That trade-off made sense for our trading analytics pipeline. It might make sense for yours too - or it might not.

When you do reach for the Disruptor pattern, remember:

  1. Pre-allocate everything at startup
  2. Pad sequences to prevent false sharing
  3. Choose your wait strategy based on latency vs. CPU trade-offs
  4. Monitor buffer utilization and plan for backpressure
  5. Test under realistic load before production

That 3 AM incident is now a distant memory. Our pipeline handles 10x the original load with latency consistently under 200 nanoseconds. The on-call rotation is much quieter these days.

Key Takeaways

Let me distill the most important lessons from this journey:

Architecture Decisions:

  • The transport layer between processing stages often dominates overall latency
  • Pre-allocation eliminates GC pressure in the hot path
  • A single shared buffer outperforms multiple independent queues
  • Sequential memory access patterns enable CPU prefetching

Implementation Details:

  • Use power-of-2 buffer sizes for fast index calculation via bitwise AND
  • Pad sequences to separate cache lines (128 bytes total per sequence)
  • Choose wait strategies based on your latency vs. CPU trade-off
  • Use VarHandle with proper memory ordering (release for publish, acquire for consume)

Production Operations:

  • Warm up the JIT compiler before handling production traffic
  • Monitor buffer utilization to detect backpressure early
  • Plan for graceful shutdown that drains in-flight events
  • Test under realistic load for extended periods

Team Considerations:

  • Lock-free code is harder to debug than locked code
  • Ensure your team understands memory ordering before deploying
  • Consider using the LMAX Disruptor library instead of rolling your own
  • Document your design decisions for future maintainers

The Disruptor pattern isn't magic, but it is a powerful tool when applied correctly. The key is understanding when the complexity is justified and implementing it with care.

Now go build something fast.


Appendix A: Quick Reference

Core Components

ComponentPurposeKey Methods
RingBufferPre-allocated event storageget(), getCursor()
SequenceCache-line padded counterget(), set(), compareAndSet()
SequenceBarrierCross-stage coordinationwaitFor()
EventProcessorPer-stage workerrun(), onEvent()
WaitStrategyLatency vs. CPU trade-offwaitFor()

Wait Strategy Selection

StrategyLatencyCPU UsageUse Case
BusySpinWaitStrategy~10-50ns100%Ultra-low latency, dedicated cores
YieldingWaitStrategy~1-10usHighLow latency, shared cores
SleepingWaitStrategy~100us-1msMediumModerate latency tolerance
BlockingWaitStrategy~10us-100us+LowThroughput over latency

Memory Layout Checklist

  • Sequences padded to separate cache lines (128 bytes total)
  • Ring buffer array is contiguous (array of objects)
  • Hot fields in event objects come first
  • Buffer size is power of 2 for fast index calculation
  • Thread affinity configured for NUMA awareness

Performance Targets

MetricBlocking BaselineDisruptor TargetTypical Achievement
Latency (mean)1-5us<200ns50-100ns
Latency (p99.9)10-100us<1us500-800ns
Throughput50-100k ops/s>500k ops/s500k-2M ops/s
Allocation10-50MB/s<1MB/sNear zero

Appendix B: Common Pitfalls and Debugging

Pitfall 1: Forgetting Memory Barriers

The most common bug in Disruptor-style code is assuming operations happen in program order. Without proper memory barriers, the CPU and compiler can reorder operations, leading to subtle bugs.

// WRONG: No memory barrier between write and publish
buffer[index] = event;
sequence.set(newSequence);  // Consumer might see sequence before event!
 
// CORRECT: Use release semantics
buffer[index] = event;
SEQUENCE.setRelease(sequence, newSequence);  // Barrier ensures order

Debugging tip: If you see occasional corrupted data that you can't reproduce reliably, suspect memory ordering issues. Add explicit barriers and see if the problem disappears.

Pitfall 2: False Sharing Strikes Back

Even with padded sequences, false sharing can creep in through other means:

// DANGEROUS: Worker state on same cache line
public class Worker {
    private long sequence;        // One core writes
    private long processingCount; // Same core writes - fine
    private long errorCount;      // Monitoring thread reads - FALSE SHARING!
}

Any field read by a different thread should be on its own cache line or accept the performance penalty.

Pitfall 3: Sequence Overflow

Using int for sequences leads to overflow after 2^31 operations:

// DANGEROUS: Int overflow after ~2 billion operations
private int sequence = 0;  // Overflows in ~4 minutes at 10M ops/sec
 
// SAFE: Long lasts effectively forever
private long sequence = 0;  // Overflows in 29,000 years at 10M ops/sec

Pitfall 4: Consumer Starvation

With multiple consumers processing different events, one slow consumer can starve others:

// Each consumer processes every Nth event
Worker 0: events 0, 4, 8, 12...
Worker 1: events 1, 5, 9, 13...
Worker 2: events 2, 6, 10, 14...
Worker 3: events 3, 7, 11, 15...
 
// If Worker 0 is slow, it blocks the ring buffer from advancing
// Workers 1-3 finish their work but can't get more events

Solution: Implement work-stealing or use batched processing to rebalance load.

Pitfall 5: Spin-Loop Power Consumption

Busy-spin loops on laptops or power-constrained environments can cause thermal throttling:

// CPU runs hot, may throttle
while (sequence.get() < target) {
    // Empty spin
}
 
// Better: Give CPU a break
while (sequence.get() < target) {
    Thread.onSpinWait();  // PAUSE instruction
}
 
// Even better for non-critical paths
while (sequence.get() < target) {
    LockSupport.parkNanos(1);  // Brief sleep
}

Debugging Tools

1. Add instrumentation counters

private final LongAdder casRetries = new LongAdder();
private final LongAdder casSuccesses = new LongAdder();
private final LongAdder waitCycles = new LongAdder();
 
public long claim() {
    long seq;
    int retries = 0;
    while (!cursor.compareAndSet(seq = cursor.get(), seq + 1)) {
        retries++;
    }
    casRetries.add(retries);
    casSuccesses.increment();
    return seq + 1;
}

2. Trace sequence progression

// Log sequence progression for debugging
private void logProgress() {
    logger.debug("Sequences: pub={}, s1={}, s2={}, s3={}, gap={}",
        publisherSequence.get(),
        stage1Sequence.get(),
        stage2Sequence.get(),
        stage3Sequence.get(),
        publisherSequence.get() - stage3Sequence.get());
}

3. Stress testing

@Test
void stressTest() {
    DisruptorStylePipeline pipeline = new DisruptorStylePipeline();
    pipeline.start();
 
    AtomicLong published = new AtomicLong();
    AtomicLong processed = new AtomicLong();
 
    // Spawn many producer threads
    for (int i = 0; i < 16; i++) {
        new Thread(() -> {
            byte[] data = new byte[100];
            for (int j = 0; j < 1_000_000; j++) {
                pipeline.publish(System.nanoTime(), 1, j, data, 0, 100);
                published.incrementAndGet();
            }
        }).start();
    }
 
    // Wait and verify
    Thread.sleep(60_000);
    assertEquals(published.get(), processed.get(),
        "All published events should be processed");
}

Run stress tests for extended periods (hours, not seconds) to catch rare race conditions.


Appendix C: Real-World Case Studies

Case Study 1: LMAX Exchange

LMAX Exchange, a multilateral trading facility, processes over 6 million transactions per second with average latency under 1 millisecond. Their architecture centers on the Disruptor pattern:

  • Single-threaded business logic eliminates synchronization
  • Ring buffer connects market data feed to matching engine
  • Events processed in strict sequence order
  • Zero garbage in the critical path

Key insight: They realized that single-threaded processing with a fast event bus outperformed multi-threaded processing with locks.

Case Study 2: Log4j2 Async Loggers

Apache Log4j2 uses a Disruptor-based async logger that can log up to 18 times more messages per second than synchronous loggers:

// log4j2.xml configuration
<Configuration>
    <Appenders>
        <RandomAccessFile name="AsyncFile" fileName="async.log">
            <PatternLayout pattern="%d %p %c{1.} [%t] %m%n"/>
        </RandomAccessFile>
    </Appenders>
    <Loggers>
        <AsyncRoot level="info" includeLocation="false">
            <AppenderRef ref="AsyncFile"/>
        </AsyncRoot>
    </Loggers>
</Configuration>

Application threads publish log events to a ring buffer. A dedicated logging thread drains the buffer and writes to disk. Application latency is minimized because logging never blocks.

Case Study 3: Trading Analytics Platform (Our Story)

Before the Disruptor transformation:

  • 3-stage pipeline with LinkedBlockingQueue
  • 10k events/sec at p99 latency of 5ms
  • GC pauses of 100ms+ during peak load
  • Regular 3 AM incidents

After the transformation:

  • Single ring buffer shared by all stages
  • 500k events/sec at p99 latency of 287ns
  • GC pauses under 20ms
  • Zero production incidents from the pipeline

Total development time: 2 weeks. ROI: Incalculable (saved the business).


Appendix D: Alternative Implementations

Using LMAX Disruptor Library

For production use, consider the battle-tested LMAX Disruptor library:

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>4.0.0</version>
</dependency>
// Define your event
public class TradeEvent {
    private long orderId;
    private double price;
    private int quantity;
 
    public void set(long orderId, double price, int quantity) {
        this.orderId = orderId;
        this.price = price;
        this.quantity = quantity;
    }
    // Getters...
}
 
// Event factory for pre-allocation
EventFactory<TradeEvent> factory = TradeEvent::new;
 
// Create the Disruptor
Disruptor<TradeEvent> disruptor = new Disruptor<>(
    factory,
    1024,  // Ring buffer size
    DaemonThreadFactory.INSTANCE,
    ProducerType.MULTI,
    new BusySpinWaitStrategy()
);
 
// Connect event handlers (stages)
disruptor.handleEventsWith(new ParseHandler())
         .then(new EnrichHandler())
         .then(new AggregateHandler());
 
// Start processing
disruptor.start();
RingBuffer<TradeEvent> ringBuffer = disruptor.getRingBuffer();
 
// Publish events
long sequence = ringBuffer.next();
try {
    TradeEvent event = ringBuffer.get(sequence);
    event.set(12345L, 99.99, 100);
} finally {
    ringBuffer.publish(sequence);
}

The LMAX library handles all the complexity of multi-producer coordination, sequence barriers, and wait strategies.

Using JCTools Queues

JCTools provides production-ready concurrent queues:

<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>4.0.1</version>
</dependency>
// MPSC (Multi-Producer Single-Consumer) queue
MpscArrayQueue<Event> mpscQueue = new MpscArrayQueue<>(1024);
 
// SPSC (Single-Producer Single-Consumer) queue - fastest
SpscArrayQueue<Event> spscQueue = new SpscArrayQueue<>(1024);
 
// Usage is similar to standard queues
mpscQueue.offer(event);  // Non-blocking
Event e = mpscQueue.poll();  // Non-blocking

JCTools queues are simpler than full Disruptor pipelines but still provide excellent performance for point-to-point messaging.

Chronicle Queue for Persistence

If you need persisted events with replay capability:

try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
    // Write events
    ExcerptAppender appender = queue.createAppender();
    appender.writeDocument(w -> w.write("trade")
        .marshallable(m -> {
            m.write("orderId").int64(12345L);
            m.write("price").float64(99.99);
            m.write("quantity").int32(100);
        }));
 
    // Read events (can start from any index)
    ExcerptTailer tailer = queue.createTailer();
    tailer.readDocument(r -> {
        long orderId = r.read("trade").read("orderId").int64();
        // Process...
    });
}

Chronicle Queue memory-maps files for near-Disruptor performance with full persistence.


Appendix E: Further Reading

Original Disruptor Resources

  • JCTools: High-performance concurrent data structures for Java
  • Agrona: High-performance buffers and messaging primitives
  • Chronicle Queue: Persistent, low-latency message queue

Academic Background

  • "The Art of Multiprocessor Programming" by Herlihy & Shavit
  • "Java Concurrency in Practice" by Goetz et al.
  • "What Every Programmer Should Know About Memory" by Ulrich Drepper

Videos and Presentations

  • Martin Thompson's InfoQ Talk: "Designing for Performance" - The original presentation that introduced the Disruptor to the wider community
  • Gil Tene's Talks on Latency: Essential viewing for understanding percentile latency and its impact on user experience
  • Cliff Click's Lock-Free Talks: Deep dives into the theory and practice of lock-free algorithms

Online Resources

  • Mechanical Sympathy Group: Google group dedicated to writing software that works sympathetically with modern hardware
  • JMH (Java Microbenchmark Harness): Essential tool for accurate performance measurement
  • async-profiler: Low-overhead profiler for finding CPU and allocation hotspots

Final Thoughts

As I reflect on this journey from that 3 AM incident to a pipeline that handles half a million events per second with sub-microsecond latency, a few thoughts stand out.

Performance optimization is not about clever tricks or obscure techniques. It's about understanding where time goes and systematically eliminating waste. In our case, the waste was in coordination overhead - locks, context switches, and memory allocations that had nothing to do with our actual business logic.

The Disruptor pattern taught me that sometimes the best optimization is not doing something at all. We eliminated locks by not needing them. We eliminated allocations by pre-allocating. We eliminated cache misses by controlling memory layout. Each "not doing" required understanding what we were avoiding and why.

But perhaps the most important lesson is humility. Before that incident, I thought our pipeline was well-designed. It used standard, battle-tested components. It followed best practices. And yet it was 50x slower than it could have been. The lesson: measure everything, assume nothing, and always be willing to question your assumptions.

The Disruptor pattern may not be the right tool for your next project. But the principles behind it - understanding your hardware, minimizing coordination, and measuring relentlessly - apply everywhere. Whether you're building a trading system or a web application, these principles will serve you well.

Happy coding, and may your latencies be low and your throughput high.


This article is part of the Lock-Free in Java series. See the companion repository at https://github.com/techishthoughts-org/off_heap_algorithms for complete source code and benchmarks.