Wednesday afternoon, 3:47 PM. I was reviewing our ring buffer implementation's performance metrics when something caught my eye. Our single-producer single-consumer (SPSC) queue was handling orders reliably, but the numbers didn't add up. We were processing individual messages at roughly 20-30 nanoseconds each - respectable by most standards, but our throughput calculations showed we were leaving significant performance on the table.
I pulled up the profiler and there it was: memory barriers. Every single enqueue operation and every single dequeue operation was executing a full memory fence. That's the cost of inter-thread visibility in modern CPUs. The useful copy into the buffer took perhaps 2-3 nanoseconds. The remaining 18-27 nanoseconds were pure synchronization overhead.
Let me put this in perspective. Our trading system was processing about 25 million events per second at peak. At 25ns average per operation, we were spending roughly 625 milliseconds per second on memory barriers alone. That's 62.5% of our CPU time on synchronization, not actual work.
I'd seen this pattern before in network packet processing. When you're handling individual packets, the per-packet overhead dominates. But what if you could batch packets together and amortize that overhead? The same principle had to apply here.
That evening, I started sketching out what would become our batched queue. The idea was simple in concept but tricky in execution: instead of publishing each element individually with its own memory barrier, collect elements into batches and publish them with a single fence. Same correctness guarantees, fraction of the overhead.
What followed was a two-week deep dive into memory ordering, cache behavior, and the subtle art of batch processing. The end result? A queue that processes elements at 5-10 nanoseconds each under batched workloads - a 2-4x improvement that transformed our system's throughput characteristics. But more importantly, I gained a much deeper understanding of where performance actually goes in lock-free data structures.
Before we can optimize memory barriers, we need to understand what they actually do and why they cost so much. This section gets into the weeds of CPU architecture, but the concepts are essential for understanding our optimization.
Modern CPUs don't execute instructions in strict program order. They reorder, speculate, and parallelize to maximize throughput. This is great for performance but creates a problem for multi-threaded code: Thread A might write to memory locations X and Y in that order, but Thread B might observe the writes in reverse order, or see only one of them.
A memory barrier (also called a memory fence) is a CPU instruction that enforces ordering. There are several types:
Store-Store Barrier: Ensures all stores before the barrier complete before any stores after it. This prevents the CPU from reordering writes.
Load-Load Barrier: Ensures all loads before the barrier complete before any loads after it. This prevents the CPU from speculatively reading ahead.
Load-Store Barrier: Ensures all loads before complete before any stores after.
Store-Load Barrier: The most expensive. Ensures all stores before complete and are visible to other CPUs before any loads after. This typically requires flushing the store buffer.
On x86-64 (Intel/AMD), most memory operations have implicit ordering that reduces the need for explicit barriers. But volatile writes in Java still typically compile to a LOCK-prefixed instruction or an explicit MFENCE, which:
Flushes the store buffer: The CPU maintains a buffer of pending writes for performance. A barrier forces these to drain to the cache.
Waits for cache coherency: The write must propagate to other cores' caches via the MESI protocol. This involves inter-core communication over the CPU's interconnect.
Prevents speculative execution: The CPU can't proceed past the barrier until the ordering is established, stalling the pipeline.
The actual cost varies by operation and CPU:
Operation
Typical Cost (cycles)
Normal load
~4 (L1 hit)
Normal store
~4 (L1 hit)
Volatile load
~4-10 (depending on cache state)
Volatile store
~20-50 (includes store buffer drain)
CAS (Compare-And-Swap)
~15-40 (plus cache line bouncing)
Full memory fence (MFENCE)
~30-100
On a 3 GHz CPU, a 30-cycle operation takes 10 nanoseconds. That's our memory barrier overhead per element.
A typical SPSC ring buffer needs barriers at specific points:
// Producer sidepublic void offer(T element) { long pos = head; buffer[pos & mask] = element; // BARRIER: Ensure buffer write is visible before head update head = pos + 1; // volatile write includes barrier}// Consumer sidepublic T poll() { long pos = tail; if (pos == head) { // volatile read return null; } // BARRIER: Ensure we see the buffer write after seeing head update T element = buffer[pos & mask]; tail = pos + 1; // volatile write return element;}
Every offer() requires at least one store barrier (the volatile write to head). Every poll() requires at least one load barrier (the volatile read of head) and one store barrier (the volatile write to tail). That's 2-3 barriers per element processed.
Here's the key insight that drives our optimization: synchronization fences have a fixed cost regardless of payload size. Guarding one write costs roughly the same as guarding a thousand.
Modern CPUs use deep pipelines (15-20 stages on Intel/AMD) to achieve high instruction-level parallelism. A memory barrier disrupts this pipeline in several ways:
Retirement stall: Instructions after the barrier can't retire until the barrier completes
Issue stall: Some CPUs can't issue new instructions while waiting for the barrier
Speculation rollback: Any speculative execution past the barrier must be discarded
This pipeline disruption is why barriers cost dozens of cycles rather than single-digit cycles. The CPU has to drain and refill its pipeline, losing significant throughput.
By batching, we suffer this pipeline disruption once per batch instead of once per element. For a 64-element batch, that's a 64x reduction in pipeline disruptions.
package com.techishthoughts.batch;import java.lang.invoke.MethodHandles;import java.lang.invoke.VarHandle;/** * Single-Producer Single-Consumer queue with per-element synchronization. * * This is the baseline implementation that we'll optimize. * Each offer/poll operation includes full memory barrier overhead. */public class SingleOperationQueue<T> { // VarHandle for atomic operations private static final VarHandle HEAD; private static final VarHandle TAIL; static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); HEAD = lookup.findVarHandle(SingleOperationQueue.class, "head", long.class); TAIL = lookup.findVarHandle(SingleOperationQueue.class, "tail", long.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } // Cache line padding to prevent false sharing private long p01, p02, p03, p04, p05, p06, p07; /** Producer's write position */ private volatile long head = 0; private long p11, p12, p13, p14, p15, p16, p17; /** Consumer's read position */ private volatile long tail = 0; private long p21, p22, p23, p24, p25, p26, p27; /** Ring buffer storage */ private final Object[] buffer; /** Capacity (power of 2) */ private final int capacity; /** Mask for fast modulo: index = position & mask */ private final int mask; /** * Creates a new queue with the specified capacity. * Capacity is rounded up to the next power of 2. */ public SingleOperationQueue(int requestedCapacity) { this.capacity = roundUpToPowerOf2(requestedCapacity); this.mask = this.capacity - 1; this.buffer = new Object[this.capacity]; } private static int roundUpToPowerOf2(int value) { int highBit = Integer.highestOneBit(value); return (highBit == value) ? value : highBit << 1; } /** * Adds an element to the queue. * * BARRIER COST: One volatile write (store barrier) * * @return true if successful, false if queue is full */ public boolean offer(T element) { if (element == null) { throw new NullPointerException("Null elements not allowed"); } long currentHead = head; // Plain read (we're the only writer) long currentTail = (long) TAIL.getAcquire(this); // Acquire semantics // Check if full (head would catch up to tail) if (currentHead - currentTail >= capacity) { return false; } // Write element to buffer int index = (int) (currentHead & mask); buffer[index] = element; // Publish: volatile write ensures element is visible before head advances // This is our MEMORY BARRIER - costs ~20-30ns HEAD.setRelease(this, currentHead + 1); return true; } /** * Removes and returns an element from the queue. * * BARRIER COST: One volatile read (acquire) + one volatile write (release) * * @return the next element, or null if queue is empty */ @SuppressWarnings("unchecked") public T poll() { long currentTail = tail; // Plain read (we're the only reader) long currentHead = (long) HEAD.getAcquire(this); // Acquire semantics // Check if empty if (currentTail >= currentHead) { return null; } // Read element from buffer int index = (int) (currentTail & mask); T element = (T) buffer[index]; // Help GC by clearing the reference buffer[index] = null; // Advance tail: volatile write ensures we don't re-read this slot TAIL.setRelease(this, currentTail + 1); return element; } /** * Returns approximate number of elements in queue. */ public int size() { long h = (long) HEAD.getAcquire(this); long t = (long) TAIL.getAcquire(this); long size = h - t; return (int) Math.max(0, Math.min(size, capacity)); } public boolean isEmpty() { return head == tail; } public int capacity() { return capacity; }}
The key insight is that memory barriers order all preceding operations, not just the immediately previous one. If we batch multiple writes before issuing a barrier, all of them become visible simultaneously at a fraction of the per-element cost.
Our batched queue uses a two-phase commit pattern:
Phase 1 - Accumulate: Producer writes elements to the buffer without barriers
Phase 2 - Commit: A single barrier makes all accumulated elements visible
To implement this safely, we use a "shadow head" pattern. localHead is the producer's private write position with no synchronization overhead, while publishedHead is the position visible to the consumer, updated with volatile semantics.
The producer advances localHead with each write but only updates publishedHead when committing a batch.
// Producer stateprivate long localHead = 0; // Private, no barriers neededprivate volatile long publishedHead = 0; // Visible to consumerpublic void offerBatch(T[] elements, int count) { // Phase 1: Write all elements (no barriers) for (int i = 0; i < count; i++) { int index = (int) (localHead & mask); buffer[index] = elements[i]; localHead++; // Plain increment, no barrier } // Phase 2: Single barrier publishes entire batch PUBLISHED_HEAD.setRelease(this, localHead);}
public int pollBatch(T[] output, int maxCount) { long currentTail = localTail; long availableHead = (long) PUBLISHED_HEAD.getAcquire(this); // Calculate how many elements are available long available = availableHead - currentTail; int count = (int) Math.min(available, maxCount); if (count == 0) { return 0; } // Phase 1: Read all elements (no barriers between reads) for (int i = 0; i < count; i++) { int index = (int) (currentTail & mask); output[i] = (T) buffer[index]; buffer[index] = null; // Help GC currentTail++; } // Phase 2: Single barrier publishes consumption localTail = currentTail; PUBLISHED_TAIL.setRelease(this, currentTail); return count;}
package com.techishthoughts.batch;import java.lang.invoke.MethodHandles;import java.lang.invoke.VarHandle;import java.util.function.Consumer;/** * Batched Single-Producer Single-Consumer Queue. * * Optimizes throughput by amortizing memory barrier costs across multiple elements. * * Key optimizations: * 1. Shadow head/tail pattern: separate local and published positions * 2. Batch commit: single barrier for multiple elements * 3. Lazy publication: accumulate before publishing * 4. Adaptive batching: automatic batch size based on workload * * Performance characteristics: * - Per-element cost: 5-10ns (vs 20-30ns for single-operation queue) * - Optimal batch size: 10-100 elements * - 2-4x throughput improvement over naive implementation * * @param <T> Element type */public class BatchedSPSCQueue<T> { // ========== VarHandle Setup ========== private static final VarHandle PUBLISHED_HEAD; private static final VarHandle PUBLISHED_TAIL; private static final VarHandle CACHED_HEAD; private static final VarHandle CACHED_TAIL; static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); PUBLISHED_HEAD = lookup.findVarHandle( BatchedSPSCQueue.class, "publishedHead", long.class); PUBLISHED_TAIL = lookup.findVarHandle( BatchedSPSCQueue.class, "publishedTail", long.class); CACHED_HEAD = lookup.findVarHandle( BatchedSPSCQueue.class, "cachedHead", long.class); CACHED_TAIL = lookup.findVarHandle( BatchedSPSCQueue.class, "cachedTail", long.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } // ========== Configuration ========== /** Default batch size for auto-commit */ private static final int DEFAULT_BATCH_SIZE = 64; /** Maximum batch size to prevent unbounded latency */ private static final int MAX_BATCH_SIZE = 1024; // ========== Producer Fields (Cache Line 1) ========== @SuppressWarnings("unused") private long p01, p02, p03, p04, p05, p06, p07; /** Producer's local write position (no synchronization needed) */ private long localHead = 0; /** Last position published to consumer */ private volatile long publishedHead = 0; /** Producer's cached view of consumer's tail (reduces volatile reads) */ private long cachedTail = 0; /** Elements accumulated since last publish */ private int pendingCount = 0; /** Configured batch size */ private final int batchSize; @SuppressWarnings("unused") private long p08, p09, p10; // ========== Consumer Fields (Cache Line 2) ========== @SuppressWarnings("unused") private long p11, p12, p13, p14, p15, p16, p17; /** Consumer's local read position (no synchronization needed) */ private long localTail = 0; /** Last position published to producer */ private volatile long publishedTail = 0; /** Consumer's cached view of producer's head (reduces volatile reads) */ private long cachedHead = 0; @SuppressWarnings("unused") private long p18, p19, p20; // ========== Shared State (Cache Line 3) ========== @SuppressWarnings("unused") private long p21, p22, p23, p24, p25, p26, p27; /** Ring buffer storage */ private final Object[] buffer; /** Buffer capacity (power of 2) */ private final int capacity; /** Bit mask for fast modulo: index = position & mask */ private final int mask; // ========== Constructors ========== /** * Creates a batched queue with default batch size. */ public BatchedSPSCQueue(int requestedCapacity) { this(requestedCapacity, DEFAULT_BATCH_SIZE); } /** * Creates a batched queue with specified batch size. * * @param requestedCapacity Queue capacity (rounded to power of 2) * @param batchSize Elements per batch (clamped to 1-MAX_BATCH_SIZE) */ public BatchedSPSCQueue(int requestedCapacity, int batchSize) { this.capacity = roundUpToPowerOf2(Math.max(requestedCapacity, 2)); this.mask = this.capacity - 1; this.buffer = new Object[this.capacity]; this.batchSize = Math.max(1, Math.min(batchSize, MAX_BATCH_SIZE)); } private static int roundUpToPowerOf2(int value) { int highBit = Integer.highestOneBit(value); return (highBit == value) ? value : highBit << 1; } // ========== Single Element API (with auto-batching) ========== /** * Adds an element to the queue. * * Elements are accumulated locally until batch size is reached, * then published with a single memory barrier. * * @return true if successful, false if queue is full */ public boolean offer(T element) { if (element == null) { throw new NullPointerException("Null elements not allowed"); } // Check if we have space (use cached tail to avoid volatile read) if (localHead - cachedTail >= capacity) { // Cached tail is stale - refresh it cachedTail = (long) PUBLISHED_TAIL.getAcquire(this); if (localHead - cachedTail >= capacity) { // Publish any pending elements before failing if (pendingCount > 0) { commit(); } return false; // Queue is genuinely full } } // Write element to buffer (no barrier) int index = (int) (localHead & mask); buffer[index] = element; localHead++; pendingCount++; // Auto-commit when batch size reached if (pendingCount >= batchSize) { commit(); } return true; } /** * Forces publication of all pending elements. * * Call this when you need low latency and can't wait for batch to fill. * Also called automatically when batch size is reached. */ public void commit() { if (pendingCount > 0) { // Single barrier publishes all pending elements PUBLISHED_HEAD.setRelease(this, localHead); pendingCount = 0; } } /** * Retrieves and removes an element from the queue. * * @return the next element, or null if queue is empty */ @SuppressWarnings("unchecked") public T poll() { // Check if we have elements (use cached head to avoid volatile read) if (localTail >= cachedHead) { // Cached head is stale - refresh it cachedHead = (long) PUBLISHED_HEAD.getAcquire(this); if (localTail >= cachedHead) { return null; // Queue is genuinely empty } } // Read element from buffer int index = (int) (localTail & mask); T element = (T) buffer[index]; buffer[index] = null; // Help GC localTail++; // Lazy publication: only update published tail periodically // This is safe because producer only needs to know we're not full if ((localTail & (batchSize - 1)) == 0) { PUBLISHED_TAIL.setRelease(this, localTail); } return element; } // ========== Batch API ========== /** * Adds multiple elements in a single batch. * * This is the most efficient way to use this queue when you have * multiple elements ready to enqueue. * * @param elements Array of elements to add * @param offset Starting index in the array * @param count Number of elements to add * @return Number of elements actually added */ public int offerBatch(T[] elements, int offset, int count) { if (count <= 0) return 0; // Calculate available space if (localHead - cachedTail >= capacity) { cachedTail = (long) PUBLISHED_TAIL.getAcquire(this); } long available = capacity - (localHead - cachedTail); int toAdd = (int) Math.min(count, available); if (toAdd == 0) { return 0; } // Write all elements without barriers for (int i = 0; i < toAdd; i++) { T element = elements[offset + i]; if (element == null) { throw new NullPointerException( "Null element at index " + (offset + i)); } int index = (int) (localHead & mask); buffer[index] = element; localHead++; } // Single barrier publishes entire batch PUBLISHED_HEAD.setRelease(this, localHead); pendingCount = 0; return toAdd; } /** * Retrieves multiple elements in a single batch. * * @param output Array to store retrieved elements * @param offset Starting index in output array * @param maxCount Maximum elements to retrieve * @return Number of elements retrieved */ @SuppressWarnings("unchecked") public int pollBatch(T[] output, int offset, int maxCount) { if (maxCount <= 0) return 0; // Calculate available elements if (localTail >= cachedHead) { cachedHead = (long) PUBLISHED_HEAD.getAcquire(this); } long available = cachedHead - localTail; int toRead = (int) Math.min(maxCount, available); if (toRead == 0) { return 0; } // Read all elements for (int i = 0; i < toRead; i++) { int index = (int) (localTail & mask); output[offset + i] = (T) buffer[index]; buffer[index] = null; localTail++; } // Single barrier publishes consumption PUBLISHED_TAIL.setRelease(this, localTail); return toRead; } /** * Drains all available elements to a consumer function. * * More efficient than repeated poll() calls. * * @param consumer Function to process each element * @return Number of elements processed */ @SuppressWarnings("unchecked") public int drain(Consumer<T> consumer) { // Get all available elements cachedHead = (long) PUBLISHED_HEAD.getAcquire(this); long available = cachedHead - localTail; if (available <= 0) { return 0; } int count = (int) available; // Process all elements for (int i = 0; i < count; i++) { int index = (int) (localTail & mask); T element = (T) buffer[index]; buffer[index] = null; localTail++; consumer.accept(element); } // Single barrier at the end PUBLISHED_TAIL.setRelease(this, localTail); return count; } /** * Fills the queue from a supplier function. * * Efficient for bulk loading. * * @param supplier Function that supplies elements (return null to stop) * @param maxCount Maximum elements to add * @return Number of elements added */ public int fill(java.util.function.Supplier<T> supplier, int maxCount) { // Calculate available space if (localHead - cachedTail >= capacity) { cachedTail = (long) PUBLISHED_TAIL.getAcquire(this); } long available = capacity - (localHead - cachedTail); int canAdd = (int) Math.min(maxCount, available); int added = 0; for (int i = 0; i < canAdd; i++) { T element = supplier.get(); if (element == null) { break; // Supplier exhausted } int index = (int) (localHead & mask); buffer[index] = element; localHead++; added++; } if (added > 0) { PUBLISHED_HEAD.setRelease(this, localHead); pendingCount = 0; } return added; } // ========== Query Operations ========== /** * Returns the number of pending (uncommitted) elements. * Only meaningful from producer thread. */ public int pendingCount() { return pendingCount; } /** * Returns approximate size of the queue. */ public int size() { long h = (long) PUBLISHED_HEAD.getAcquire(this); long t = (long) PUBLISHED_TAIL.getAcquire(this); long size = h - t; return (int) Math.max(0, Math.min(size, capacity)); } public boolean isEmpty() { return publishedHead == publishedTail; } public int capacity() { return capacity; } /** * Returns configured batch size. */ public int batchSize() { return batchSize; }}
The producer's localHead can advance freely without any synchronization - it's only accessed by the producer thread. Only when we commit do we update publishedHead with a barrier. This separation is what enables batching.
Why cache the opposite position?
The producer needs to know the consumer's tail to check if the queue is full. But reading a volatile is expensive. By caching the tail locally and only refreshing when we think the queue might be full, we eliminate most volatile reads.
Why auto-commit at batch size?
Without auto-commit, elements could sit unpublished indefinitely, hurting latency. Auto-commit at batch size provides predictable latency while preserving throughput.
Why lazy tail publication?
The consumer doesn't need to publish its tail after every element - the producer only cares that the queue isn't full. Publishing every batchSize elements is sufficient and reduces barriers.
Full volatile has stronger semantics than we need:
// Full volatile write includes:// 1. StoreStore barrier before (our writes complete before this write)// 2. StoreLoad barrier after (this write completes before any loads)// Release semantics only include:// 1. StoreStore barrier before (our writes complete before this write)// No StoreLoad barrier - we don't need to order with subsequent loads
The StoreLoad barrier is the most expensive part of a volatile write. Release semantics give us what we need at lower cost.
Thread resumes, sees A, incorrectly assumes no change
Our queue avoids this by using monotonically increasing positions:
Head only increases
Tail only increases
We use long (64-bit) positions - would take 2^63 operations to wrap
Even if positions could wrap, the slot-based indexing means we'd be accessing the same logical slot. The sequence numbers embedded in our positions ensure we're always in the correct cycle.
T(n) = Total time to process n elementsB = Cost of one memory barrier (~25ns)W = Cost of one buffer write (~3ns)S = Batch sizeWith no batching:T(n) = n × (W + B) = n × 28nsWith batching:T(n) = n × W + (n/S) × B = n × 3ns + (n/S) × 25ns = n × (3 + 25/S) nsFor S=10: T(n) = n × 5.5ns (49% of naive)For S=50: T(n) = n × 3.5ns (67% improvement)For S=100: T(n) = n × 3.25ns (diminishing returns)
Why does batching improve cache behavior so dramatically? Several factors:
1. Spatial Locality
When we batch writes, consecutive buffer slots are accessed together. Modern CPUs prefetch cache lines speculatively - by accessing slots sequentially in a tight loop, we benefit from hardware prefetching.
// Single operation: random access pattern from CPU perspectivefor (each_operation) { // barrier overhead buffer[index] = element; // barrier overhead}// Batched: sequential access patternfor (each_element_in_batch) { buffer[index++] = element; // No barrier between writes}// Single barrier at end
2. Reduced Cache Line Bouncing
Each volatile write can cause the cache line containing head to bounce between cores. With batching, this bouncing happens once per batch instead of once per element.
3. Store Buffer Utilization
The CPU's store buffer is finite (typically 42-56 entries on modern Intel). Without batching, the store buffer fills with barrier-waiting entries. With batching, the store buffer holds actual data writes and drains efficiently during the single barrier.
When the queue fills up, we want to apply backpressure rather than drop elements:
/** * Batched queue with blocking backpressure. */public class BackpressureBatchedQueue<T> extends BatchedSPSCQueue<T> { private final LockSupport parkSupport = new LockSupport(); private volatile Thread waitingProducer; @Override public boolean offer(T element) { while (!super.offer(element)) { // Queue full - apply backpressure waitingProducer = Thread.currentThread(); LockSupport.parkNanos(1_000_000); // 1ms if (Thread.interrupted()) { return false; } } return true; } @Override public T poll() { T element = super.poll(); // Wake up waiting producer if we made space Thread waiting = waitingProducer; if (waiting != null) { waitingProducer = null; LockSupport.unpark(waiting); } return element; }}
Adding observability to understand batch behavior:
/** * Batched queue with metrics collection. */public class MetricsBatchedQueue<T> extends BatchedSPSCQueue<T> { // Metrics counters private final LongAdder totalOffers = new LongAdder(); private final LongAdder totalPolls = new LongAdder(); private final LongAdder totalCommits = new LongAdder(); private final LongAdder batchedElements = new LongAdder(); // Histograms for batch size distribution private final long[] batchSizeHistogram = new long[17]; // 0-16 slots @Override public boolean offer(T element) { boolean result = super.offer(element); if (result) { totalOffers.increment(); } return result; } @Override public void commit() { int pending = pendingCount(); if (pending > 0) { totalCommits.increment(); batchedElements.add(pending); // Record in histogram (log2 buckets) int bucket = Math.min(16, 32 - Integer.numberOfLeadingZeros(pending)); batchSizeHistogram[bucket]++; } super.commit(); } @Override public T poll() { T element = super.poll(); if (element != null) { totalPolls.increment(); } return element; } // Metrics accessors public long getTotalOffers() { return totalOffers.sum(); } public long getTotalPolls() { return totalPolls.sum(); } public long getTotalCommits() { return totalCommits.sum(); } public double getAverageBatchSize() { long commits = totalCommits.sum(); return commits == 0 ? 0 : (double) batchedElements.sum() / commits; } public String getBatchSizeDistribution() { StringBuilder sb = new StringBuilder(); for (int i = 0; i < batchSizeHistogram.length; i++) { if (batchSizeHistogram[i] > 0) { int low = i == 0 ? 0 : (1 << (i - 1)); int high = (1 << i) - 1; sb.append(String.format("[%d-%d]: %d%n", low, high, batchSizeHistogram[i])); } } return sb.toString(); }}
/** * Multi-stage pipeline with batched queues between stages. */public class MultiStagePipeline<I, M, O> { private final BatchedEventPipeline<I> stage1; private final BatchedEventPipeline<M> stage2; private final Thread bridgeThread; private volatile boolean running = true; public MultiStagePipeline( int queueCapacity, Function<I, M> stage1Processor, Function<M, O> stage2Processor) { this.stage1 = new BatchedEventPipeline<>(queueCapacity, i -> { // Stage 1 processes and produces intermediate type return stage1Processor.apply(i); }); // Bridge connects stage1 output to stage2 input this.bridgeThread = new Thread(() -> { Object[] batch = new Object[64]; while (running) { int count = stage1.retrieveBatch((I[]) batch, 64); if (count > 0) { // Forward to stage 2 // (In real code, stage2 would accept M type) } Thread.onSpinWait(); } }, "pipeline-bridge"); bridgeThread.setDaemon(true); bridgeThread.start(); // Stage 2 would be similar this.stage2 = null; // Simplified for example } public void submit(I input) { stage1.submit(input); } public void shutdown() throws InterruptedException { running = false; stage1.shutdown(); bridgeThread.join(5000); }}
Ensuring buffer elements are cache-line aligned can improve performance:
/** * Queue with cache-aligned buffer slots. * Each slot occupies a full cache line to prevent false sharing. */public class CacheAlignedBatchedQueue<T> { // Each slot is padded to 64 bytes (cache line size) private static final int SLOT_PADDING = 64 / 8; // 8 longs per slot private final long[] paddedBuffer; // Use longs for padding private final Object[] actualElements; public CacheAlignedBatchedQueue(int capacity) { // Actual elements stored separately this.actualElements = new Object[capacity]; // Padded buffer for index tracking (optional, for demonstration) this.paddedBuffer = new long[capacity * SLOT_PADDING]; }}
/** * NUMA-aware queue that keeps producer and consumer data * on their respective NUMA nodes. */public class NumaAwareBatchedQueue<T> extends BatchedSPSCQueue<T> { // Hint: Use JVM flags to control thread/memory placement // -XX:+UseNUMA // -XX:+UseNUMAInterleaving // Or use libraries like OpenHFT's affinity for explicit control public static void pinProducerToNode(int numaNode) { // Implementation depends on affinity library // AffinityLock.acquireLock(numaNode); } public static void pinConsumerToNode(int numaNode) { // AffinityLock.acquireLock(numaNode); }}
Recommended JVM flags for batched queue performance:
java \ -Xms8g -Xmx8g \ # Fixed heap to avoid resizing -XX:+UseG1GC \ # Or ZGC for ultra-low latency -XX:MaxGCPauseMillis=10 \ # Aggressive GC target -XX:+AlwaysPreTouch \ # Touch pages at startup -XX:-UseBiasedLocking \ # Disable biased locking -XX:+UseNUMA \ # NUMA awareness -XX:+PerfDisableSharedMem \ # Disable perf shared memory -Djava.lang.Integer.IntegerCache.high=10000 \ # Larger int cache -jar application.jar
Why disable biased locking?
Biased locking assumes low contention and adds overhead when revoking biases. In high-throughput systems, this revocation cost can spike latency. Disabling it provides more consistent performance.
Why AlwaysPreTouch?
Without pre-touching, the JVM lazily allocates physical memory pages. First access to a new page triggers a page fault, adding latency jitter. Pre-touching at startup makes memory access times predictable.
For maximum performance, pin threads to specific CPU cores:
/** * Thread affinity helper using OpenHFT Affinity library. */public class AffinityHelper { /** * Pin current thread to a specific CPU core. */ public static void pinToCore(int coreId) { try { // Using OpenHFT Affinity // AffinityLock lock = AffinityLock.acquireLock(coreId); // Or using JNA to call sched_setaffinity System.out.println("Pinned thread " + Thread.currentThread().getName() + " to core " + coreId); } catch (Exception e) { System.err.println("Failed to pin thread: " + e.getMessage()); } } /** * Pin producer and consumer to different cores. */ public static void setupAffinityForSPSC(Thread producer, Thread consumer) { // Ideally, same socket but different physical cores // Avoid hyperthreads of the same core // Example: Producer on core 0, Consumer on core 2 // (assuming cores 0,1 are hyperthreads and 2,3 are hyperthreads) producer.start(); pinToCore(0); consumer.start(); // Consumer should pin itself when it starts }}
The journey from 25ns to 5ns per element taught me something fundamental about performance optimization: the overhead often isn't where you expect it.
When I first profiled our queue, I assumed the bottleneck was data movement - copying references, updating indices, and checking bounds. The useful work was just 2-3 nanoseconds. The remaining 22 nanoseconds came from the fences that keep threads aligned.
The lesson is simple: fence cost stays roughly flat even as the protected batch grows. By batching our operations - accumulating multiple elements before a single publication - we amortized that fixed cost across many elements.
A release/acquire barrier costs roughly 20-30 nanoseconds regardless of how much data it orders. Design your synchronization around this.
2. Batch Size Matters
Too small: you don't amortize barriers effectively. Too large: you add latency waiting for batches to fill. The sweet spot is typically 50-100 elements for throughput, 10-30 for latency-sensitive applications.
3. Shadow State Enables Batching
Separating local state (private to one thread) from published state (visible to other threads) is the key pattern. Advance local state freely; update published state only when committing.
4. Profile Before Optimizing
We only discovered memory barriers as the bottleneck through profiling. The fix is specific to this bottleneck - it wouldn't help if the issue were elsewhere.
5. Correctness Through Memory Ordering
Our batched queue maintains the same correctness guarantees as single-operation queues. Release-acquire semantics ensure that when the consumer sees the updated position, it also sees all the data that was written.
The batched SPSC queue is now a core component of our trading infrastructure. It handles market data feeds where every nanosecond counts and log aggregation pipelines where throughput is king. The same fundamental optimization - amortizing barrier costs - applies in both cases.
But the real lesson isn't about batching specifically. It's about understanding where time actually goes. Modern hardware is incredibly fast at moving data but surprisingly slow at synchronization. When you find yourself constrained by synchronization overhead rather than actual work, batching is often the answer.
The next time you're optimizing a concurrent data structure, don't just focus on the algorithm. Look at the synchronization. Count the barriers. Measure their cost. And consider whether you can batch your way to better performance.
Sometimes the biggest wins come from doing less, not more - fewer barriers, fewer cache line transfers, fewer synchronization events. In the world of lock-free programming, less is often more.
Cause: Race condition - likely bug in implementation
Solution: Verify memory ordering, check for missing acquire/release
// Verify these patterns:PUBLISHED_HEAD.setRelease(this, localHead); // After all writeslong head = (long) PUBLISHED_HEAD.getAcquire(this); // Before any reads
Batching provides the best improvement for SPSC scenarios because it directly addresses the dominant cost (memory barriers) rather than secondary factors.
For maximum performance, combine batching with other techniques:
/** * Fully optimized batched queue combining all techniques. */@Contended // Prevent false sharing at object levelpublic class FullyOptimizedQueue<T> extends BatchedSPSCQueue<T> { // Thread affinity on construction private final int producerCore; private final int consumerCore; public FullyOptimizedQueue(int capacity, int batchSize, int producerCore, int consumerCore) { super(capacity, batchSize); this.producerCore = producerCore; this.consumerCore = consumerCore; } /** * Call from producer thread to pin affinity. */ public void initProducer() { AffinityHelper.pinToCore(producerCore); } /** * Call from consumer thread to pin affinity. */ public void initConsumer() { AffinityHelper.pinToCore(consumerCore); }}
Batching alone: When memory barriers are the bottleneck and you have bursty or continuous workloads.
Batching + affinity: When you need the lowest possible latency and can dedicate CPU cores.
Batching + NUMA: On multi-socket systems where memory locality matters.
All combined: For trading systems, network packet processing, and other ultra-low-latency applications.
The key is to measure first and apply optimizations that address your actual bottlenecks. Batching is often the highest-impact optimization for SPSC queues, but your mileage may vary based on workload characteristics.