Batch Processing: Amortized I/O for Audit Persistence

April 9, 202651 min readNew

Amortize disk I/O costs with double-buffered batch processing, continuous flushing, and regulatory-compliant audit logging for trading systems.

Batch Processing: Amortized I/O for Audit Persistence
React to this article

Lock-Free in Java: Scenario 10 - Batch Processing for Maximum Throughput

Table of Contents

Part 1: The Performance Wall

Wednesday afternoon, 3:47 PM. I was reviewing our ring buffer implementation's performance metrics when something caught my eye. Our single-producer single-consumer (SPSC) queue was handling orders reliably, but the numbers didn't add up. We were processing individual messages at roughly 20-30 nanoseconds each - respectable by most standards, but our throughput calculations showed we were leaving significant performance on the table.

I pulled up the profiler and there it was: memory barriers. Every single enqueue operation and every single dequeue operation was executing a full memory fence. That's the cost of inter-thread visibility in modern CPUs. The useful copy into the buffer took perhaps 2-3 nanoseconds. The remaining 18-27 nanoseconds were pure synchronization overhead.

Let me put this in perspective. Our trading system was processing about 25 million events per second at peak. At 25ns average per operation, we were spending roughly 625 milliseconds per second on memory barriers alone. That's 62.5% of our CPU time on synchronization, not actual work.

I'd seen this pattern before in network packet processing. When you're handling individual packets, the per-packet overhead dominates. But what if you could batch packets together and amortize that overhead? The same principle had to apply here.

That evening, I started sketching out what would become our batched queue. The idea was simple in concept but tricky in execution: instead of publishing each element individually with its own memory barrier, collect elements into batches and publish them with a single fence. Same correctness guarantees, fraction of the overhead.

What followed was a two-week deep dive into memory ordering, cache behavior, and the subtle art of batch processing. The end result? A queue that processes elements at 5-10 nanoseconds each under batched workloads - a 2-4x improvement that transformed our system's throughput characteristics. But more importantly, I gained a much deeper understanding of where performance actually goes in lock-free data structures.

This is the story of that journey.


Part 2: Understanding Memory Barriers

Before we can optimize memory barriers, we need to understand what they actually do and why they cost so much. This section gets into the weeds of CPU architecture, but the concepts are essential for understanding our optimization.

What Is a Memory Barrier?

Modern CPUs don't execute instructions in strict program order. They reorder, speculate, and parallelize to maximize throughput. This is great for performance but creates a problem for multi-threaded code: Thread A might write to memory locations X and Y in that order, but Thread B might observe the writes in reverse order, or see only one of them.

A memory barrier (also called a memory fence) is a CPU instruction that enforces ordering. There are several types:

Store-Store Barrier: Ensures all stores before the barrier complete before any stores after it. This prevents the CPU from reordering writes.

Load-Load Barrier: Ensures all loads before the barrier complete before any loads after it. This prevents the CPU from speculatively reading ahead.

Load-Store Barrier: Ensures all loads before complete before any stores after.

Store-Load Barrier: The most expensive. Ensures all stores before complete and are visible to other CPUs before any loads after. This typically requires flushing the store buffer.

The Cost of Barriers

On x86-64 (Intel/AMD), most memory operations have implicit ordering that reduces the need for explicit barriers. But volatile writes in Java still typically compile to a LOCK-prefixed instruction or an explicit MFENCE, which:

  1. Flushes the store buffer: The CPU maintains a buffer of pending writes for performance. A barrier forces these to drain to the cache.

  2. Waits for cache coherency: The write must propagate to other cores' caches via the MESI protocol. This involves inter-core communication over the CPU's interconnect.

  3. Prevents speculative execution: The CPU can't proceed past the barrier until the ordering is established, stalling the pipeline.

The actual cost varies by operation and CPU:

OperationTypical Cost (cycles)
Normal load~4 (L1 hit)
Normal store~4 (L1 hit)
Volatile load~4-10 (depending on cache state)
Volatile store~20-50 (includes store buffer drain)
CAS (Compare-And-Swap)~15-40 (plus cache line bouncing)
Full memory fence (MFENCE)~30-100

On a 3 GHz CPU, a 30-cycle operation takes 10 nanoseconds. That's our memory barrier overhead per element.

How Ring Buffers Use Barriers

A typical SPSC ring buffer needs barriers at specific points:

// Producer side
public void offer(T element) {
    long pos = head;
    buffer[pos & mask] = element;
 
    // BARRIER: Ensure buffer write is visible before head update
    head = pos + 1;  // volatile write includes barrier
}
 
// Consumer side
public T poll() {
    long pos = tail;
 
    if (pos == head) {  // volatile read
        return null;
    }
 
    // BARRIER: Ensure we see the buffer write after seeing head update
    T element = buffer[pos & mask];
    tail = pos + 1;  // volatile write
    return element;
}

Every offer() requires at least one store barrier (the volatile write to head). Every poll() requires at least one load barrier (the volatile read of head) and one store barrier (the volatile write to tail). That's 2-3 barriers per element processed.

The Insight: Barrier Amortization

Here's the key insight that drives our optimization: synchronization fences have a fixed cost regardless of payload size. Guarding one write costs roughly the same as guarding a thousand.

Consider the difference:

Individual operations (100 elements):

write element[0]  -> BARRIER
write element[1]  -> BARRIER
write element[2]  -> BARRIER
...
write element[99] -> BARRIER
Total: 100 barriers = ~1000 cycles

Batched operations (100 elements):

write element[0]
write element[1]
write element[2]
...
write element[99]
           -> BARRIER (just once!)
Total: 1 barrier = ~30 cycles

The batch approach uses 97% fewer cycles on barriers while moving the same data. This is the foundation of our optimization.

Visualizing Memory Barrier Impact

To truly understand the impact, let's visualize what happens at the CPU level during a single operation versus batched operations:

Loading diagram...

Compare this with batched operations:

Loading diagram...

The cost of the barrier is amortized across all N elements. If N=64, that's 0.4ns per element instead of 25ns.

CPU Pipeline Considerations

Modern CPUs use deep pipelines (15-20 stages on Intel/AMD) to achieve high instruction-level parallelism. A memory barrier disrupts this pipeline in several ways:

  1. Retirement stall: Instructions after the barrier can't retire until the barrier completes
  2. Issue stall: Some CPUs can't issue new instructions while waiting for the barrier
  3. Speculation rollback: Any speculative execution past the barrier must be discarded

This pipeline disruption is why barriers cost dozens of cycles rather than single-digit cycles. The CPU has to drain and refill its pipeline, losing significant throughput.

By batching, we suffer this pipeline disruption once per batch instead of once per element. For a 64-element batch, that's a 64x reduction in pipeline disruptions.


Part 3: The Naive Single-Operation Approach

Let's establish our baseline with a straightforward SPSC queue implementation. This is the "obvious" approach that most developers would write.

Basic SPSC Queue Implementation

View source

package com.techishthoughts.batch;
 
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
 
/**
 * Single-Producer Single-Consumer queue with per-element synchronization.
 *
 * This is the baseline implementation that we'll optimize.
 * Each offer/poll operation includes full memory barrier overhead.
 */
public class SingleOperationQueue<T> {
 
    // VarHandle for atomic operations
    private static final VarHandle HEAD;
    private static final VarHandle TAIL;
 
    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            HEAD = lookup.findVarHandle(SingleOperationQueue.class, "head", long.class);
            TAIL = lookup.findVarHandle(SingleOperationQueue.class, "tail", long.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
 
    // Cache line padding to prevent false sharing
    private long p01, p02, p03, p04, p05, p06, p07;
 
    /** Producer's write position */
    private volatile long head = 0;
 
    private long p11, p12, p13, p14, p15, p16, p17;
 
    /** Consumer's read position */
    private volatile long tail = 0;
 
    private long p21, p22, p23, p24, p25, p26, p27;
 
    /** Ring buffer storage */
    private final Object[] buffer;
 
    /** Capacity (power of 2) */
    private final int capacity;
 
    /** Mask for fast modulo: index = position & mask */
    private final int mask;
 
    /**
     * Creates a new queue with the specified capacity.
     * Capacity is rounded up to the next power of 2.
     */
    public SingleOperationQueue(int requestedCapacity) {
        this.capacity = roundUpToPowerOf2(requestedCapacity);
        this.mask = this.capacity - 1;
        this.buffer = new Object[this.capacity];
    }
 
    private static int roundUpToPowerOf2(int value) {
        int highBit = Integer.highestOneBit(value);
        return (highBit == value) ? value : highBit << 1;
    }
 
    /**
     * Adds an element to the queue.
     *
     * BARRIER COST: One volatile write (store barrier)
     *
     * @return true if successful, false if queue is full
     */
    public boolean offer(T element) {
        if (element == null) {
            throw new NullPointerException("Null elements not allowed");
        }
 
        long currentHead = head;  // Plain read (we're the only writer)
        long currentTail = (long) TAIL.getAcquire(this);  // Acquire semantics
 
        // Check if full (head would catch up to tail)
        if (currentHead - currentTail >= capacity) {
            return false;
        }
 
        // Write element to buffer
        int index = (int) (currentHead & mask);
        buffer[index] = element;
 
        // Publish: volatile write ensures element is visible before head advances
        // This is our MEMORY BARRIER - costs ~20-30ns
        HEAD.setRelease(this, currentHead + 1);
 
        return true;
    }
 
    /**
     * Removes and returns an element from the queue.
     *
     * BARRIER COST: One volatile read (acquire) + one volatile write (release)
     *
     * @return the next element, or null if queue is empty
     */
    @SuppressWarnings("unchecked")
    public T poll() {
        long currentTail = tail;  // Plain read (we're the only reader)
        long currentHead = (long) HEAD.getAcquire(this);  // Acquire semantics
 
        // Check if empty
        if (currentTail >= currentHead) {
            return null;
        }
 
        // Read element from buffer
        int index = (int) (currentTail & mask);
        T element = (T) buffer[index];
 
        // Help GC by clearing the reference
        buffer[index] = null;
 
        // Advance tail: volatile write ensures we don't re-read this slot
        TAIL.setRelease(this, currentTail + 1);
 
        return element;
    }
 
    /**
     * Returns approximate number of elements in queue.
     */
    public int size() {
        long h = (long) HEAD.getAcquire(this);
        long t = (long) TAIL.getAcquire(this);
        long size = h - t;
        return (int) Math.max(0, Math.min(size, capacity));
    }
 
    public boolean isEmpty() {
        return head == tail;
    }
 
    public int capacity() {
        return capacity;
    }
}

Analyzing the Barrier Overhead

Let's trace through what happens when we process 100 elements:

Producer (100 offer() calls):

OperationBarrier
offer(e0): write buffer[0], setRelease(head)BARRIER
offer(e1): write buffer[1], setRelease(head)BARRIER
offer(e2): write buffer[2], setRelease(head)BARRIER
......
offer(e99): write buffer[99], setRelease(head)BARRIER

Total barriers: 100 | Estimated barrier cost: 100 × 25ns = 2,500ns

Consumer (100 poll() calls):

OperationBarriers
poll(): getAcquire(head), read buffer[0], setRelease(tail)2 BARRIERS
poll(): getAcquire(head), read buffer[1], setRelease(tail)2 BARRIERS
......

Total barriers: 200 (acquire + release per call) | Estimated barrier cost: 200 × 15ns = 3,000ns

Combined: 5,500ns for 100 elements = 55ns/element

The actual work (copying references, incrementing indices) takes perhaps 5ns total per element. The remaining 50ns is synchronization overhead.

Benchmark: Baseline Performance

Using JMH to measure our baseline:

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 2)
@Fork(3)
@State(Scope.Benchmark)
public class SingleOperationBenchmark {
 
    private SingleOperationQueue<Long> queue;
    private long producerCounter = 0;
 
    @Setup(Level.Iteration)
    public void setup() {
        queue = new SingleOperationQueue<>(64 * 1024);
    }
 
    @Benchmark
    @Group("throughput")
    @GroupThreads(1)
    public void producer() {
        queue.offer(producerCounter++);
    }
 
    @Benchmark
    @Group("throughput")
    @GroupThreads(1)
    public Long consumer() {
        return queue.poll();
    }
}

Results on Intel Xeon E5-2680 v4 @ 2.4GHz:

Benchmark                              Mode  Cnt   Score   Error  Units
SingleOperationBenchmark.producer      avgt   30  24.73 ± 1.82  ns/op
SingleOperationBenchmark.consumer      avgt   30  21.45 ± 1.34  ns/op
Combined throughput: ~21.7M ops/sec per direction

So we're at roughly 25ns per element. Our target: get this under 10ns through batching.


Part 4: The Batched SPSC Queue Design

The key insight is that memory barriers order all preceding operations, not just the immediately previous one. If we batch multiple writes before issuing a barrier, all of them become visible simultaneously at a fraction of the per-element cost.

Design Goals

  1. Reduce barrier frequency: Issue one barrier per batch instead of one per element
  2. Maintain correctness: Consumer must never see partially-written batches
  3. Preserve API simplicity: Caller shouldn't need to manage batching manually
  4. Adaptive batching: Handle both bursty and steady workloads efficiently

The Two-Phase Commit Pattern

Our batched queue uses a two-phase commit pattern:

Phase 1 - Accumulate: Producer writes elements to the buffer without barriers Phase 2 - Commit: A single barrier makes all accumulated elements visible

Loading diagram...

The Shadow Head Pattern

To implement this safely, we use a "shadow head" pattern. localHead is the producer's private write position with no synchronization overhead, while publishedHead is the position visible to the consumer, updated with volatile semantics.

The producer advances localHead with each write but only updates publishedHead when committing a batch.

// Producer state
private long localHead = 0;       // Private, no barriers needed
private volatile long publishedHead = 0;  // Visible to consumer
 
public void offerBatch(T[] elements, int count) {
    // Phase 1: Write all elements (no barriers)
    for (int i = 0; i < count; i++) {
        int index = (int) (localHead & mask);
        buffer[index] = elements[i];
        localHead++;  // Plain increment, no barrier
    }
 
    // Phase 2: Single barrier publishes entire batch
    PUBLISHED_HEAD.setRelease(this, localHead);
}

Consumer Batch Reading

The consumer can also batch its operations:

public int pollBatch(T[] output, int maxCount) {
    long currentTail = localTail;
    long availableHead = (long) PUBLISHED_HEAD.getAcquire(this);
 
    // Calculate how many elements are available
    long available = availableHead - currentTail;
    int count = (int) Math.min(available, maxCount);
 
    if (count == 0) {
        return 0;
    }
 
    // Phase 1: Read all elements (no barriers between reads)
    for (int i = 0; i < count; i++) {
        int index = (int) (currentTail & mask);
        output[i] = (T) buffer[index];
        buffer[index] = null;  // Help GC
        currentTail++;
    }
 
    // Phase 2: Single barrier publishes consumption
    localTail = currentTail;
    PUBLISHED_TAIL.setRelease(this, currentTail);
 
    return count;
}

Part 5: Complete Implementation

Here's the full batched SPSC queue implementation with detailed commentary:

View source

package com.techishthoughts.batch;
 
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.function.Consumer;
 
/**
 * Batched Single-Producer Single-Consumer Queue.
 *
 * Optimizes throughput by amortizing memory barrier costs across multiple elements.
 *
 * Key optimizations:
 * 1. Shadow head/tail pattern: separate local and published positions
 * 2. Batch commit: single barrier for multiple elements
 * 3. Lazy publication: accumulate before publishing
 * 4. Adaptive batching: automatic batch size based on workload
 *
 * Performance characteristics:
 * - Per-element cost: 5-10ns (vs 20-30ns for single-operation queue)
 * - Optimal batch size: 10-100 elements
 * - 2-4x throughput improvement over naive implementation
 *
 * @param <T> Element type
 */
public class BatchedSPSCQueue<T> {
 
    // ========== VarHandle Setup ==========
 
    private static final VarHandle PUBLISHED_HEAD;
    private static final VarHandle PUBLISHED_TAIL;
    private static final VarHandle CACHED_HEAD;
    private static final VarHandle CACHED_TAIL;
 
    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            PUBLISHED_HEAD = lookup.findVarHandle(
                BatchedSPSCQueue.class, "publishedHead", long.class);
            PUBLISHED_TAIL = lookup.findVarHandle(
                BatchedSPSCQueue.class, "publishedTail", long.class);
            CACHED_HEAD = lookup.findVarHandle(
                BatchedSPSCQueue.class, "cachedHead", long.class);
            CACHED_TAIL = lookup.findVarHandle(
                BatchedSPSCQueue.class, "cachedTail", long.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
 
    // ========== Configuration ==========
 
    /** Default batch size for auto-commit */
    private static final int DEFAULT_BATCH_SIZE = 64;
 
    /** Maximum batch size to prevent unbounded latency */
    private static final int MAX_BATCH_SIZE = 1024;
 
    // ========== Producer Fields (Cache Line 1) ==========
 
    @SuppressWarnings("unused")
    private long p01, p02, p03, p04, p05, p06, p07;
 
    /** Producer's local write position (no synchronization needed) */
    private long localHead = 0;
 
    /** Last position published to consumer */
    private volatile long publishedHead = 0;
 
    /** Producer's cached view of consumer's tail (reduces volatile reads) */
    private long cachedTail = 0;
 
    /** Elements accumulated since last publish */
    private int pendingCount = 0;
 
    /** Configured batch size */
    private final int batchSize;
 
    @SuppressWarnings("unused")
    private long p08, p09, p10;
 
    // ========== Consumer Fields (Cache Line 2) ==========
 
    @SuppressWarnings("unused")
    private long p11, p12, p13, p14, p15, p16, p17;
 
    /** Consumer's local read position (no synchronization needed) */
    private long localTail = 0;
 
    /** Last position published to producer */
    private volatile long publishedTail = 0;
 
    /** Consumer's cached view of producer's head (reduces volatile reads) */
    private long cachedHead = 0;
 
    @SuppressWarnings("unused")
    private long p18, p19, p20;
 
    // ========== Shared State (Cache Line 3) ==========
 
    @SuppressWarnings("unused")
    private long p21, p22, p23, p24, p25, p26, p27;
 
    /** Ring buffer storage */
    private final Object[] buffer;
 
    /** Buffer capacity (power of 2) */
    private final int capacity;
 
    /** Bit mask for fast modulo: index = position & mask */
    private final int mask;
 
    // ========== Constructors ==========
 
    /**
     * Creates a batched queue with default batch size.
     */
    public BatchedSPSCQueue(int requestedCapacity) {
        this(requestedCapacity, DEFAULT_BATCH_SIZE);
    }
 
    /**
     * Creates a batched queue with specified batch size.
     *
     * @param requestedCapacity Queue capacity (rounded to power of 2)
     * @param batchSize Elements per batch (clamped to 1-MAX_BATCH_SIZE)
     */
    public BatchedSPSCQueue(int requestedCapacity, int batchSize) {
        this.capacity = roundUpToPowerOf2(Math.max(requestedCapacity, 2));
        this.mask = this.capacity - 1;
        this.buffer = new Object[this.capacity];
        this.batchSize = Math.max(1, Math.min(batchSize, MAX_BATCH_SIZE));
    }
 
    private static int roundUpToPowerOf2(int value) {
        int highBit = Integer.highestOneBit(value);
        return (highBit == value) ? value : highBit << 1;
    }
 
    // ========== Single Element API (with auto-batching) ==========
 
    /**
     * Adds an element to the queue.
     *
     * Elements are accumulated locally until batch size is reached,
     * then published with a single memory barrier.
     *
     * @return true if successful, false if queue is full
     */
    public boolean offer(T element) {
        if (element == null) {
            throw new NullPointerException("Null elements not allowed");
        }
 
        // Check if we have space (use cached tail to avoid volatile read)
        if (localHead - cachedTail >= capacity) {
            // Cached tail is stale - refresh it
            cachedTail = (long) PUBLISHED_TAIL.getAcquire(this);
            if (localHead - cachedTail >= capacity) {
                // Publish any pending elements before failing
                if (pendingCount > 0) {
                    commit();
                }
                return false;  // Queue is genuinely full
            }
        }
 
        // Write element to buffer (no barrier)
        int index = (int) (localHead & mask);
        buffer[index] = element;
        localHead++;
        pendingCount++;
 
        // Auto-commit when batch size reached
        if (pendingCount >= batchSize) {
            commit();
        }
 
        return true;
    }
 
    /**
     * Forces publication of all pending elements.
     *
     * Call this when you need low latency and can't wait for batch to fill.
     * Also called automatically when batch size is reached.
     */
    public void commit() {
        if (pendingCount > 0) {
            // Single barrier publishes all pending elements
            PUBLISHED_HEAD.setRelease(this, localHead);
            pendingCount = 0;
        }
    }
 
    /**
     * Retrieves and removes an element from the queue.
     *
     * @return the next element, or null if queue is empty
     */
    @SuppressWarnings("unchecked")
    public T poll() {
        // Check if we have elements (use cached head to avoid volatile read)
        if (localTail >= cachedHead) {
            // Cached head is stale - refresh it
            cachedHead = (long) PUBLISHED_HEAD.getAcquire(this);
            if (localTail >= cachedHead) {
                return null;  // Queue is genuinely empty
            }
        }
 
        // Read element from buffer
        int index = (int) (localTail & mask);
        T element = (T) buffer[index];
        buffer[index] = null;  // Help GC
 
        localTail++;
 
        // Lazy publication: only update published tail periodically
        // This is safe because producer only needs to know we're not full
        if ((localTail & (batchSize - 1)) == 0) {
            PUBLISHED_TAIL.setRelease(this, localTail);
        }
 
        return element;
    }
 
    // ========== Batch API ==========
 
    /**
     * Adds multiple elements in a single batch.
     *
     * This is the most efficient way to use this queue when you have
     * multiple elements ready to enqueue.
     *
     * @param elements Array of elements to add
     * @param offset Starting index in the array
     * @param count Number of elements to add
     * @return Number of elements actually added
     */
    public int offerBatch(T[] elements, int offset, int count) {
        if (count <= 0) return 0;
 
        // Calculate available space
        if (localHead - cachedTail >= capacity) {
            cachedTail = (long) PUBLISHED_TAIL.getAcquire(this);
        }
        long available = capacity - (localHead - cachedTail);
        int toAdd = (int) Math.min(count, available);
 
        if (toAdd == 0) {
            return 0;
        }
 
        // Write all elements without barriers
        for (int i = 0; i < toAdd; i++) {
            T element = elements[offset + i];
            if (element == null) {
                throw new NullPointerException(
                    "Null element at index " + (offset + i));
            }
            int index = (int) (localHead & mask);
            buffer[index] = element;
            localHead++;
        }
 
        // Single barrier publishes entire batch
        PUBLISHED_HEAD.setRelease(this, localHead);
        pendingCount = 0;
 
        return toAdd;
    }
 
    /**
     * Retrieves multiple elements in a single batch.
     *
     * @param output Array to store retrieved elements
     * @param offset Starting index in output array
     * @param maxCount Maximum elements to retrieve
     * @return Number of elements retrieved
     */
    @SuppressWarnings("unchecked")
    public int pollBatch(T[] output, int offset, int maxCount) {
        if (maxCount <= 0) return 0;
 
        // Calculate available elements
        if (localTail >= cachedHead) {
            cachedHead = (long) PUBLISHED_HEAD.getAcquire(this);
        }
        long available = cachedHead - localTail;
        int toRead = (int) Math.min(maxCount, available);
 
        if (toRead == 0) {
            return 0;
        }
 
        // Read all elements
        for (int i = 0; i < toRead; i++) {
            int index = (int) (localTail & mask);
            output[offset + i] = (T) buffer[index];
            buffer[index] = null;
            localTail++;
        }
 
        // Single barrier publishes consumption
        PUBLISHED_TAIL.setRelease(this, localTail);
 
        return toRead;
    }
 
    /**
     * Drains all available elements to a consumer function.
     *
     * More efficient than repeated poll() calls.
     *
     * @param consumer Function to process each element
     * @return Number of elements processed
     */
    @SuppressWarnings("unchecked")
    public int drain(Consumer<T> consumer) {
        // Get all available elements
        cachedHead = (long) PUBLISHED_HEAD.getAcquire(this);
        long available = cachedHead - localTail;
 
        if (available <= 0) {
            return 0;
        }
 
        int count = (int) available;
 
        // Process all elements
        for (int i = 0; i < count; i++) {
            int index = (int) (localTail & mask);
            T element = (T) buffer[index];
            buffer[index] = null;
            localTail++;
            consumer.accept(element);
        }
 
        // Single barrier at the end
        PUBLISHED_TAIL.setRelease(this, localTail);
 
        return count;
    }
 
    /**
     * Fills the queue from a supplier function.
     *
     * Efficient for bulk loading.
     *
     * @param supplier Function that supplies elements (return null to stop)
     * @param maxCount Maximum elements to add
     * @return Number of elements added
     */
    public int fill(java.util.function.Supplier<T> supplier, int maxCount) {
        // Calculate available space
        if (localHead - cachedTail >= capacity) {
            cachedTail = (long) PUBLISHED_TAIL.getAcquire(this);
        }
        long available = capacity - (localHead - cachedTail);
        int canAdd = (int) Math.min(maxCount, available);
 
        int added = 0;
        for (int i = 0; i < canAdd; i++) {
            T element = supplier.get();
            if (element == null) {
                break;  // Supplier exhausted
            }
            int index = (int) (localHead & mask);
            buffer[index] = element;
            localHead++;
            added++;
        }
 
        if (added > 0) {
            PUBLISHED_HEAD.setRelease(this, localHead);
            pendingCount = 0;
        }
 
        return added;
    }
 
    // ========== Query Operations ==========
 
    /**
     * Returns the number of pending (uncommitted) elements.
     * Only meaningful from producer thread.
     */
    public int pendingCount() {
        return pendingCount;
    }
 
    /**
     * Returns approximate size of the queue.
     */
    public int size() {
        long h = (long) PUBLISHED_HEAD.getAcquire(this);
        long t = (long) PUBLISHED_TAIL.getAcquire(this);
        long size = h - t;
        return (int) Math.max(0, Math.min(size, capacity));
    }
 
    public boolean isEmpty() {
        return publishedHead == publishedTail;
    }
 
    public int capacity() {
        return capacity;
    }
 
    /**
     * Returns configured batch size.
     */
    public int batchSize() {
        return batchSize;
    }
}

Design Decisions Explained

Why separate local and published positions?

The producer's localHead can advance freely without any synchronization - it's only accessed by the producer thread. Only when we commit do we update publishedHead with a barrier. This separation is what enables batching.

Why cache the opposite position?

The producer needs to know the consumer's tail to check if the queue is full. But reading a volatile is expensive. By caching the tail locally and only refreshing when we think the queue might be full, we eliminate most volatile reads.

Why auto-commit at batch size?

Without auto-commit, elements could sit unpublished indefinitely, hurting latency. Auto-commit at batch size provides predictable latency while preserving throughput.

Why lazy tail publication?

The consumer doesn't need to publish its tail after every element - the producer only cares that the queue isn't full. Publishing every batchSize elements is sufficient and reduces barriers.


Part 6: Memory Ordering Deep Dive

Now examine how the batched queue maintains correctness despite reduced synchronization.

The Happens-Before Relationship

Java's memory model defines a "happens-before" relationship. If action A happens-before action B, then A's effects are visible to B. Key rules:

  1. Program order: Within a thread, earlier actions happen-before later ones
  2. Volatile write/read: A volatile write happens-before subsequent volatile reads of the same variable
  3. Transitivity: If A happens-before B, and B happens-before C, then A happens-before C

Our queue relies on these relationships:

Loading diagram...

The release-acquire pairing ensures that when the consumer sees head = 3, it also sees all buffer writes that preceded the release.

Why setRelease/getAcquire Instead of Volatile?

Full volatile has stronger semantics than we need:

// Full volatile write includes:
// 1. StoreStore barrier before (our writes complete before this write)
// 2. StoreLoad barrier after (this write completes before any loads)
 
// Release semantics only include:
// 1. StoreStore barrier before (our writes complete before this write)
// No StoreLoad barrier - we don't need to order with subsequent loads

The StoreLoad barrier is the most expensive part of a volatile write. Release semantics give us what we need at lower cost.

Proving Correctness

We can prove the batched queue is correct:

Claim: The consumer never reads a buffer slot before the producer has written to it.

Proof:

  1. Producer writes to buffer[i] before updating publishedHead (program order)
  2. publishedHead update uses release semantics
  3. Consumer reads publishedHead with acquire semantics before reading buffer[i]
  4. Release-acquire creates happens-before edge
  5. By transitivity: buffer[i] write happens-before buffer[i] read
  6. Therefore consumer sees producer's write. QED.

Claim: Producer never overwrites a slot the consumer hasn't read.

Proof:

  1. Consumer reads buffer[i] before updating publishedTail (program order)
  2. publishedTail update uses release semantics
  3. Producer reads publishedTail with acquire semantics before writing buffer[i]
  4. By transitivity: consumer's buffer[i] read happens-before producer's buffer[i] write
  5. Therefore producer writes only to consumed slots. QED.

The ABA Problem (And Why We Don't Have It)

The ABA problem occurs when:

  1. Thread reads value A
  2. Thread is preempted
  3. Value changes A -> B -> A
  4. Thread resumes, sees A, incorrectly assumes no change

Our queue avoids this by using monotonically increasing positions:

  • Head only increases
  • Tail only increases
  • We use long (64-bit) positions - would take 2^63 operations to wrap

Even if positions could wrap, the slot-based indexing means we'd be accessing the same logical slot. The sequence numbers embedded in our positions ensure we're always in the correct cycle.


Part 7: Batch Size Optimization

Choosing the right batch size is crucial. Too small and we don't amortize barriers effectively. Too large and we add latency.

The Trade-off

Loading diagram...

Analyzing Barrier Cost vs Batch Size

Let's model the cost mathematically:

T(n) = Total time to process n elements
B = Cost of one memory barrier (~25ns)
W = Cost of one buffer write (~3ns)
S = Batch size

With no batching:
T(n) = n × (W + B) = n × 28ns

With batching:
T(n) = n × W + (n/S) × B
     = n × 3ns + (n/S) × 25ns
     = n × (3 + 25/S) ns

For S=10:  T(n) = n × 5.5ns   (49% of naive)
For S=50:  T(n) = n × 3.5ns   (67% improvement)
For S=100: T(n) = n × 3.25ns  (diminishing returns)

Empirical Batch Size Analysis

We benchmarked various batch sizes:

Batch SizePer-Element LatencyThroughput (M/s)Commit Latency
124.3ns41.2<1ns
108.7ns114.9~100ns
256.2ns161.3~250ns
505.1ns196.1~500ns
1004.6ns217.4~1μs
2504.2ns238.1~2.5μs
5004.0ns250.0~5μs
10003.9ns256.4~10μs

Key observations:

  1. Sweet spot at 50-100: Best throughput/latency balance
  2. Diminishing returns past 100: Throughput improvement flattens
  3. Latency concern at 250+: Commit latency becomes noticeable

Adaptive Batch Sizing

For workloads with variable arrival rates, consider adaptive batching:

/**
 * Adaptive batch sizing based on arrival rate.
 */
public class AdaptiveBatchedQueue<T> extends BatchedSPSCQueue<T> {
 
    private long lastCommitTime = System.nanoTime();
    private int adaptiveBatchSize = 32;
 
    // Target commit latency in nanoseconds
    private static final long TARGET_LATENCY_NS = 1_000_000;  // 1ms
 
    @Override
    public boolean offer(T element) {
        boolean result = super.offer(element);
 
        // Adapt batch size based on actual commit frequency
        if (pendingCount() == 0) {  // Just committed
            long now = System.nanoTime();
            long elapsed = now - lastCommitTime;
            lastCommitTime = now;
 
            if (elapsed > TARGET_LATENCY_NS * 2) {
                // Commits are too infrequent - reduce batch size
                adaptiveBatchSize = Math.max(8, adaptiveBatchSize / 2);
            } else if (elapsed < TARGET_LATENCY_NS / 2) {
                // Commits are too frequent - increase batch size
                adaptiveBatchSize = Math.min(512, adaptiveBatchSize * 2);
            }
        }
 
        return result;
    }
}

Latency vs Throughput Profiles

Different applications need different trade-offs:

High-Frequency Trading (Latency-Critical):

  • Batch size: 8-16
  • Auto-commit timeout: 100μs
  • Accept lower throughput for predictable latency

Log Aggregation (Throughput-Critical):

  • Batch size: 256-512
  • Auto-commit timeout: 10ms
  • Maximize throughput, latency less important

Real-Time Analytics (Balanced):

  • Batch size: 32-64
  • Auto-commit timeout: 1ms
  • Balance both concerns

Part 8: Comprehensive Benchmarks

Let's rigorously compare our implementations across various scenarios.

Benchmark Suite

@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 2)
@Measurement(iterations = 10, time = 5)
@Fork(value = 3, jvmArgs = {
    "-Xms4g", "-Xmx4g",
    "-XX:+UseG1GC",
    "-XX:+AlwaysPreTouch",
    "-XX:-UseBiasedLocking"
})
@State(Scope.Benchmark)
public class BatchProcessingBenchmark {
 
    @Param({"16", "64", "256", "1024"})
    private int batchSize;
 
    private SingleOperationQueue<Long> singleOpQueue;
    private BatchedSPSCQueue<Long> batchedQueue;
    private Long[] batchBuffer;
 
    private long producerCounter = 0;
    private volatile boolean running = true;
    private Thread consumerThread;
 
    @Setup(Level.Trial)
    public void setup() {
        singleOpQueue = new SingleOperationQueue<>(64 * 1024);
        batchedQueue = new BatchedSPSCQueue<>(64 * 1024, batchSize);
        batchBuffer = new Long[batchSize];
 
        // Pre-populate batch buffer
        for (int i = 0; i < batchSize; i++) {
            batchBuffer[i] = (long) i;
        }
 
        // Background consumer for throughput tests
        running = true;
        consumerThread = new Thread(() -> {
            Long[] drainBuffer = new Long[1024];
            while (running) {
                batchedQueue.pollBatch(drainBuffer, 0, 1024);
                singleOpQueue.poll();
            }
        });
        consumerThread.setDaemon(true);
        consumerThread.start();
    }
 
    @TearDown(Level.Trial)
    public void teardown() throws InterruptedException {
        running = false;
        consumerThread.join(1000);
    }
 
    // ========== Single Operation Benchmarks ==========
 
    @Benchmark
    public boolean singleOp_offer() {
        return singleOpQueue.offer(producerCounter++);
    }
 
    @Benchmark
    public Long singleOp_poll() {
        return singleOpQueue.poll();
    }
 
    // ========== Batched Operation Benchmarks ==========
 
    @Benchmark
    public boolean batched_offer() {
        boolean result = batchedQueue.offer(producerCounter++);
        if (producerCounter % batchSize == 0) {
            batchedQueue.commit();
        }
        return result;
    }
 
    @Benchmark
    public int batched_offerBatch() {
        for (int i = 0; i < batchSize; i++) {
            batchBuffer[i] = producerCounter++;
        }
        return batchedQueue.offerBatch(batchBuffer, 0, batchSize);
    }
 
    @Benchmark
    public int batched_pollBatch() {
        return batchedQueue.pollBatch(batchBuffer, 0, batchSize);
    }
 
    // ========== Latency Distribution Benchmark ==========
 
    @Benchmark
    @BenchmarkMode(Mode.SampleTime)
    public void latencyDistribution_singleOp(Blackhole bh) {
        bh.consume(singleOpQueue.offer(producerCounter++));
    }
 
    @Benchmark
    @BenchmarkMode(Mode.SampleTime)
    public void latencyDistribution_batched(Blackhole bh) {
        bh.consume(batchedQueue.offer(producerCounter++));
        if (producerCounter % batchSize == 0) {
            batchedQueue.commit();
        }
    }
}

Throughput Results

Operations per Second (higher is better):

ImplementationBatch=16Batch=64Batch=256Batch=1024
SingleOp offer41.2M41.2M41.2M41.2M
Batched offer89.7M156.3M198.4M215.2M
Batched offerBatch124.5M245.8M412.7M498.3M
Improvement (offer)2.18x3.79x4.82x5.22x
Improvement (batch)3.02x5.97x10.02x12.10x

Key findings:

  • Batched offer with auto-commit: 2-5x improvement
  • Explicit batch API: 3-12x improvement
  • Larger batches yield better throughput but have diminishing returns

Per-Element Latency Results

Nanoseconds per Element (lower is better):

ImplementationBatch=16Batch=64Batch=256Batch=1024
SingleOp24.3ns24.3ns24.3ns24.3ns
Batched (per element)11.2ns6.4ns5.0ns4.6ns
Batch API (per elem)8.0ns4.1ns2.4ns2.0ns
Improvement2.2x3.8x4.9x5.3x
Best case3.0x5.9x10.1x12.2x

Latency Distribution

Percentile Latency (nanoseconds, batch size = 64):

PercentileSingleOpBatchedImprovement
p5021ns5ns4.2x
p9032ns8ns4.0x
p9989ns23ns3.9x
p99.9245ns67ns3.7x
p99.991,234ns312ns4.0x

The batched implementation maintains tighter distribution across all percentiles.

Throughput Under Load

Million operations/second vs producer count:

ProducersSingleOpBatched(64)Improvement
141.2M156.3M3.8x
238.7M148.9M3.8x
435.2M142.1M4.0x
831.8M138.4M4.4x

Note: This is SPSC, so multiple producers aren't actually concurrent. This tests contention from other system activity.

Memory Behavior

Allocation rates during 60-second sustained test:

MetricSingleOpBatchedDifference
Allocation rate~0 B/s~0 B/sNone
GC events00None
Peak heap usage128MB128MBNone

Both implementations are allocation-free during steady-state operation (elements are pre-allocated by caller).

Cache Behavior Analysis

Using Linux perf to measure cache behavior reveals the efficiency gains:

# Measure cache misses during benchmark
perf stat -e L1-dcache-load-misses,L1-dcache-loads,LLC-load-misses \
    java -jar batch-benchmark.jar

Single Operation Queue (processing 10M elements):

L1-dcache-loads:          847,234,567
L1-dcache-load-misses:     12,456,789  (1.47%)
LLC-load-misses:            1,234,567  (0.15%)
Cycles per element:               ~72

Batched Queue (batch size 64, processing 10M elements):

L1-dcache-loads:          423,567,234
L1-dcache-load-misses:      3,456,789  (0.82%)
LLC-load-misses:              345,678  (0.04%)
Cycles per element:               ~19

The batched queue has:

  • 50% fewer L1 cache loads (less synchronization overhead)
  • 72% fewer L1 cache misses (better locality from batched access)
  • 72% fewer LLC misses (predictable access patterns)
  • 73% fewer cycles per element

Understanding the Cache Efficiency

Why does batching improve cache behavior so dramatically? Several factors:

1. Spatial Locality

When we batch writes, consecutive buffer slots are accessed together. Modern CPUs prefetch cache lines speculatively - by accessing slots sequentially in a tight loop, we benefit from hardware prefetching.

// Single operation: random access pattern from CPU perspective
for (each_operation) {
    // barrier overhead
    buffer[index] = element;
    // barrier overhead
}
 
// Batched: sequential access pattern
for (each_element_in_batch) {
    buffer[index++] = element;  // No barrier between writes
}
// Single barrier at end

2. Reduced Cache Line Bouncing

Each volatile write can cause the cache line containing head to bounce between cores. With batching, this bouncing happens once per batch instead of once per element.

3. Store Buffer Utilization

The CPU's store buffer is finite (typically 42-56 entries on modern Intel). Without batching, the store buffer fills with barrier-waiting entries. With batching, the store buffer holds actual data writes and drains efficiently during the single barrier.

Flame Graph Analysis

Profiling with async-profiler shows where time is spent:

Single Operation Queue - CPU Flame Graph:

Loading diagram...

38% of time is spent in setRelease - that's our memory barrier overhead.

Batched Queue - CPU Flame Graph:

Loading diagram...

With batching, setRelease drops from 38% to 11%. The time is now dominated by actual buffer operations (45%), which is exactly what we want.


Part 9: Real-World Integration Patterns

Let's explore how to integrate batched queues into production systems.

Pattern 1: Timed Commit

For latency-sensitive applications that can't wait for full batches:

/**
 * Queue with time-based auto-commit.
 * Commits pending elements if no activity for specified duration.
 */
public class TimedBatchedQueue<T> extends BatchedSPSCQueue<T> {
 
    private final long commitTimeoutNanos;
    private long lastActivityTime;
    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> commitTask;
 
    public TimedBatchedQueue(int capacity, int batchSize,
                             long commitTimeout, TimeUnit unit) {
        super(capacity, batchSize);
        this.commitTimeoutNanos = unit.toNanos(commitTimeout);
        this.lastActivityTime = System.nanoTime();
        this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "batch-commit");
            t.setDaemon(true);
            return t;
        });
 
        // Schedule periodic commit check
        scheduleCommitCheck();
    }
 
    @Override
    public boolean offer(T element) {
        lastActivityTime = System.nanoTime();
        return super.offer(element);
    }
 
    private void scheduleCommitCheck() {
        commitTask = scheduler.scheduleAtFixedRate(() -> {
            long elapsed = System.nanoTime() - lastActivityTime;
            if (elapsed >= commitTimeoutNanos && pendingCount() > 0) {
                commit();
            }
        }, commitTimeoutNanos, commitTimeoutNanos / 2, TimeUnit.NANOSECONDS);
    }
 
    public void shutdown() {
        commitTask.cancel(false);
        scheduler.shutdown();
    }
}

Pattern 2: Backpressure-Aware Batching

When the queue fills up, we want to apply backpressure rather than drop elements:

/**
 * Batched queue with blocking backpressure.
 */
public class BackpressureBatchedQueue<T> extends BatchedSPSCQueue<T> {
 
    private final LockSupport parkSupport = new LockSupport();
    private volatile Thread waitingProducer;
 
    @Override
    public boolean offer(T element) {
        while (!super.offer(element)) {
            // Queue full - apply backpressure
            waitingProducer = Thread.currentThread();
            LockSupport.parkNanos(1_000_000);  // 1ms
 
            if (Thread.interrupted()) {
                return false;
            }
        }
        return true;
    }
 
    @Override
    public T poll() {
        T element = super.poll();
 
        // Wake up waiting producer if we made space
        Thread waiting = waitingProducer;
        if (waiting != null) {
            waitingProducer = null;
            LockSupport.unpark(waiting);
        }
 
        return element;
    }
}

Pattern 3: Metrics Integration

Adding observability to understand batch behavior:

/**
 * Batched queue with metrics collection.
 */
public class MetricsBatchedQueue<T> extends BatchedSPSCQueue<T> {
 
    // Metrics counters
    private final LongAdder totalOffers = new LongAdder();
    private final LongAdder totalPolls = new LongAdder();
    private final LongAdder totalCommits = new LongAdder();
    private final LongAdder batchedElements = new LongAdder();
 
    // Histograms for batch size distribution
    private final long[] batchSizeHistogram = new long[17];  // 0-16 slots
 
    @Override
    public boolean offer(T element) {
        boolean result = super.offer(element);
        if (result) {
            totalOffers.increment();
        }
        return result;
    }
 
    @Override
    public void commit() {
        int pending = pendingCount();
        if (pending > 0) {
            totalCommits.increment();
            batchedElements.add(pending);
 
            // Record in histogram (log2 buckets)
            int bucket = Math.min(16, 32 - Integer.numberOfLeadingZeros(pending));
            batchSizeHistogram[bucket]++;
        }
        super.commit();
    }
 
    @Override
    public T poll() {
        T element = super.poll();
        if (element != null) {
            totalPolls.increment();
        }
        return element;
    }
 
    // Metrics accessors
    public long getTotalOffers() { return totalOffers.sum(); }
    public long getTotalPolls() { return totalPolls.sum(); }
    public long getTotalCommits() { return totalCommits.sum(); }
 
    public double getAverageBatchSize() {
        long commits = totalCommits.sum();
        return commits == 0 ? 0 : (double) batchedElements.sum() / commits;
    }
 
    public String getBatchSizeDistribution() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < batchSizeHistogram.length; i++) {
            if (batchSizeHistogram[i] > 0) {
                int low = i == 0 ? 0 : (1 << (i - 1));
                int high = (1 << i) - 1;
                sb.append(String.format("[%d-%d]: %d%n", low, high, batchSizeHistogram[i]));
            }
        }
        return sb.toString();
    }
}

Pattern 4: Event Pipeline with Batching

A complete event processing pipeline using batched queues:

/**
 * High-throughput event pipeline using batched SPSC queues.
 */
public class BatchedEventPipeline<E> {
 
    private final BatchedSPSCQueue<E> inputQueue;
    private final BatchedSPSCQueue<E> outputQueue;
    private final Function<E, E> processor;
    private final Thread workerThread;
    private volatile boolean running = true;
 
    // Batch processing buffers
    private static final int PROCESS_BATCH_SIZE = 64;
    private final Object[] inputBatch = new Object[PROCESS_BATCH_SIZE];
    private final Object[] outputBatch = new Object[PROCESS_BATCH_SIZE];
 
    public BatchedEventPipeline(int queueCapacity, Function<E, E> processor) {
        this.inputQueue = new BatchedSPSCQueue<>(queueCapacity, 32);
        this.outputQueue = new BatchedSPSCQueue<>(queueCapacity, 32);
        this.processor = processor;
 
        this.workerThread = new Thread(this::processLoop, "event-processor");
        this.workerThread.setDaemon(true);
        this.workerThread.start();
    }
 
    /**
     * Submit an event for processing.
     */
    public boolean submit(E event) {
        return inputQueue.offer(event);
    }
 
    /**
     * Retrieve a processed event.
     */
    public E retrieve() {
        return outputQueue.poll();
    }
 
    /**
     * Retrieve multiple processed events.
     */
    @SuppressWarnings("unchecked")
    public int retrieveBatch(E[] output, int maxCount) {
        return outputQueue.pollBatch(output, 0, maxCount);
    }
 
    @SuppressWarnings("unchecked")
    private void processLoop() {
        while (running) {
            // Batch read from input
            int count = inputQueue.pollBatch((E[]) inputBatch, 0, PROCESS_BATCH_SIZE);
 
            if (count == 0) {
                // No input - brief pause to avoid spinning
                Thread.onSpinWait();
                continue;
            }
 
            // Process batch
            for (int i = 0; i < count; i++) {
                E input = (E) inputBatch[i];
                E output = processor.apply(input);
                outputBatch[i] = output;
                inputBatch[i] = null;  // Help GC
            }
 
            // Batch write to output
            outputQueue.offerBatch((E[]) outputBatch, 0, count);
 
            // Clear output batch
            for (int i = 0; i < count; i++) {
                outputBatch[i] = null;
            }
        }
    }
 
    public void shutdown() throws InterruptedException {
        running = false;
        workerThread.join(5000);
    }
}

Pattern 5: Multi-Stage Pipeline

Chaining multiple batched stages:

Loading diagram...
/**
 * Multi-stage pipeline with batched queues between stages.
 */
public class MultiStagePipeline<I, M, O> {
 
    private final BatchedEventPipeline<I> stage1;
    private final BatchedEventPipeline<M> stage2;
    private final Thread bridgeThread;
    private volatile boolean running = true;
 
    public MultiStagePipeline(
            int queueCapacity,
            Function<I, M> stage1Processor,
            Function<M, O> stage2Processor) {
 
        this.stage1 = new BatchedEventPipeline<>(queueCapacity, i -> {
            // Stage 1 processes and produces intermediate type
            return stage1Processor.apply(i);
        });
 
        // Bridge connects stage1 output to stage2 input
        this.bridgeThread = new Thread(() -> {
            Object[] batch = new Object[64];
            while (running) {
                int count = stage1.retrieveBatch((I[]) batch, 64);
                if (count > 0) {
                    // Forward to stage 2
                    // (In real code, stage2 would accept M type)
                }
                Thread.onSpinWait();
            }
        }, "pipeline-bridge");
        bridgeThread.setDaemon(true);
        bridgeThread.start();
 
        // Stage 2 would be similar
        this.stage2 = null;  // Simplified for example
    }
 
    public void submit(I input) {
        stage1.submit(input);
    }
 
    public void shutdown() throws InterruptedException {
        running = false;
        stage1.shutdown();
        bridgeThread.join(5000);
    }
}

Part 10: Advanced Optimizations

For those seeking maximum performance, here are additional optimizations to consider.

Optimization 1: Prefetching

Modern CPUs can prefetch data before it's needed. We can hint at upcoming accesses:

/**
 * Batched queue with software prefetching hints.
 */
public class PrefetchingBatchedQueue<T> extends BatchedSPSCQueue<T> {
 
    private static final VarHandle ARRAY_HANDLE =
        MethodHandles.arrayElementVarHandle(Object[].class);
 
    @Override
    @SuppressWarnings("unchecked")
    public int pollBatch(T[] output, int offset, int maxCount) {
        // ... availability check ...
 
        for (int i = 0; i < toRead; i++) {
            int index = (int) (localTail & mask);
 
            // Prefetch next cache line
            if (i < toRead - 8) {
                int prefetchIndex = (int) ((localTail + 8) & mask);
                ARRAY_HANDLE.getOpaque(buffer, prefetchIndex);
            }
 
            output[offset + i] = (T) buffer[index];
            buffer[index] = null;
            localTail++;
        }
 
        // ... publish tail ...
        return toRead;
    }
}

Optimization 2: Cache Line Alignment for Buffer

Ensuring buffer elements are cache-line aligned can improve performance:

/**
 * Queue with cache-aligned buffer slots.
 * Each slot occupies a full cache line to prevent false sharing.
 */
public class CacheAlignedBatchedQueue<T> {
 
    // Each slot is padded to 64 bytes (cache line size)
    private static final int SLOT_PADDING = 64 / 8;  // 8 longs per slot
 
    private final long[] paddedBuffer;  // Use longs for padding
    private final Object[] actualElements;
 
    public CacheAlignedBatchedQueue(int capacity) {
        // Actual elements stored separately
        this.actualElements = new Object[capacity];
        // Padded buffer for index tracking (optional, for demonstration)
        this.paddedBuffer = new long[capacity * SLOT_PADDING];
    }
}

Optimization 3: NUMA-Aware Placement

On multi-socket systems, memory locality matters:

/**
 * NUMA-aware queue that keeps producer and consumer data
 * on their respective NUMA nodes.
 */
public class NumaAwareBatchedQueue<T> extends BatchedSPSCQueue<T> {
 
    // Hint: Use JVM flags to control thread/memory placement
    // -XX:+UseNUMA
    // -XX:+UseNUMAInterleaving
 
    // Or use libraries like OpenHFT's affinity for explicit control
 
    public static void pinProducerToNode(int numaNode) {
        // Implementation depends on affinity library
        // AffinityLock.acquireLock(numaNode);
    }
 
    public static void pinConsumerToNode(int numaNode) {
        // AffinityLock.acquireLock(numaNode);
    }
}

Optimization 4: Busy-Spin vs. Yield Trade-off

For ultra-low-latency, busy-spinning beats yielding:

/**
 * Configurable wait strategy for batched queue.
 */
public enum WaitStrategy {
    BUSY_SPIN {
        @Override
        public void waitFor() {
            Thread.onSpinWait();
        }
    },
    YIELD {
        @Override
        public void waitFor() {
            Thread.yield();
        }
    },
    PARK {
        @Override
        public void waitFor() {
            LockSupport.parkNanos(1000);
        }
    },
    ADAPTIVE {
        private int spins = 0;
 
        @Override
        public void waitFor() {
            if (++spins < 100) {
                Thread.onSpinWait();
            } else if (spins < 200) {
                Thread.yield();
            } else {
                LockSupport.parkNanos(1000);
                spins = 0;
            }
        }
    };
 
    public abstract void waitFor();
}

Pattern 6: Graceful Degradation Under Load

When the system is under stress, graceful degradation is crucial:

/**
 * Queue with load-adaptive behavior.
 * Reduces batch size under backpressure to maintain responsiveness.
 */
public class LoadAdaptiveQueue<T> extends BatchedSPSCQueue<T> {
 
    private final int normalBatchSize;
    private final int stressedBatchSize;
    private volatile int currentBatchSize;
 
    // Load detection
    private long lastPollTime = System.nanoTime();
    private static final long STRESS_THRESHOLD_NS = 1_000_000;  // 1ms
 
    public LoadAdaptiveQueue(int capacity, int normalBatch, int stressedBatch) {
        super(capacity, normalBatch);
        this.normalBatchSize = normalBatch;
        this.stressedBatchSize = stressedBatch;
        this.currentBatchSize = normalBatch;
    }
 
    @Override
    public boolean offer(T element) {
        // Detect stress: if queue is >75% full, reduce batch size
        int currentSize = size();
        if (currentSize > capacity() * 0.75) {
            currentBatchSize = stressedBatchSize;
        } else if (currentSize < capacity() * 0.25) {
            currentBatchSize = normalBatchSize;
        }
 
        boolean result = super.offer(element);
 
        // Auto-commit at current (possibly reduced) batch size
        if (pendingCount() >= currentBatchSize) {
            commit();
        }
 
        return result;
    }
 
    public int getCurrentBatchSize() {
        return currentBatchSize;
    }
 
    public boolean isUnderStress() {
        return currentBatchSize < normalBatchSize;
    }
}

Part 10B: Production Deployment Considerations

Deploying batched queues in production requires attention to several operational concerns.

JVM Configuration

Recommended JVM flags for batched queue performance:

java \
    -Xms8g -Xmx8g \                    # Fixed heap to avoid resizing
    -XX:+UseG1GC \                      # Or ZGC for ultra-low latency
    -XX:MaxGCPauseMillis=10 \           # Aggressive GC target
    -XX:+AlwaysPreTouch \               # Touch pages at startup
    -XX:-UseBiasedLocking \             # Disable biased locking
    -XX:+UseNUMA \                      # NUMA awareness
    -XX:+PerfDisableSharedMem \         # Disable perf shared memory
    -Djava.lang.Integer.IntegerCache.high=10000 \  # Larger int cache
    -jar application.jar

Why disable biased locking?

Biased locking assumes low contention and adds overhead when revoking biases. In high-throughput systems, this revocation cost can spike latency. Disabling it provides more consistent performance.

Why AlwaysPreTouch?

Without pre-touching, the JVM lazily allocates physical memory pages. First access to a new page triggers a page fault, adding latency jitter. Pre-touching at startup makes memory access times predictable.

Thread Affinity

For maximum performance, pin threads to specific CPU cores:

/**
 * Thread affinity helper using OpenHFT Affinity library.
 */
public class AffinityHelper {
 
    /**
     * Pin current thread to a specific CPU core.
     */
    public static void pinToCore(int coreId) {
        try {
            // Using OpenHFT Affinity
            // AffinityLock lock = AffinityLock.acquireLock(coreId);
            // Or using JNA to call sched_setaffinity
            System.out.println("Pinned thread " +
                Thread.currentThread().getName() + " to core " + coreId);
        } catch (Exception e) {
            System.err.println("Failed to pin thread: " + e.getMessage());
        }
    }
 
    /**
     * Pin producer and consumer to different cores.
     */
    public static void setupAffinityForSPSC(Thread producer, Thread consumer) {
        // Ideally, same socket but different physical cores
        // Avoid hyperthreads of the same core
 
        // Example: Producer on core 0, Consumer on core 2
        // (assuming cores 0,1 are hyperthreads and 2,3 are hyperthreads)
        producer.start();
        pinToCore(0);
 
        consumer.start();
        // Consumer should pin itself when it starts
    }
}

Monitoring and Alerting

Set up monitoring for queue health:

/**
 * Queue health metrics for monitoring systems.
 */
public class QueueHealthMetrics {
 
    private final BatchedSPSCQueue<?> queue;
    private final MeterRegistry registry;  // Micrometer
 
    public QueueHealthMetrics(BatchedSPSCQueue<?> queue, MeterRegistry registry) {
        this.queue = queue;
        this.registry = registry;
 
        // Register gauges
        Gauge.builder("queue.size", queue, BatchedSPSCQueue::size)
            .description("Current queue size")
            .register(registry);
 
        Gauge.builder("queue.capacity", queue, BatchedSPSCQueue::capacity)
            .description("Queue capacity")
            .register(registry);
 
        Gauge.builder("queue.utilization", queue,
                q -> (double) q.size() / q.capacity())
            .description("Queue utilization percentage")
            .register(registry);
 
        Gauge.builder("queue.pending", queue, BatchedSPSCQueue::pendingCount)
            .description("Uncommitted elements")
            .register(registry);
    }
 
    /**
     * Check if queue is healthy.
     */
    public boolean isHealthy() {
        double utilization = (double) queue.size() / queue.capacity();
 
        // Unhealthy if >90% full
        if (utilization > 0.9) {
            return false;
        }
 
        // Unhealthy if many pending uncommitted elements
        if (queue.pendingCount() > queue.batchSize() * 2) {
            return false;
        }
 
        return true;
    }
}

Alerting Thresholds

MetricWarningCriticalAction
Queue utilization> 70%> 90%Scale consumers
Pending elements> 2x batch> 5x batchCheck producer
Commit latency> 10ms> 100msReduce batch size
Poll empty rate> 50%> 90%Check producers

Capacity Planning

To size your queue appropriately:

Required capacity = Peak rate × Maximum latency tolerance × Safety factor

Example:
- Peak rate: 100,000 events/second
- Latency tolerance: 100ms
- Safety factor: 2x

Capacity = 100,000 × 0.1 × 2 = 20,000 elements

Round up to power of 2: 32,768 elements

Part 11: Trade-offs and When to Use

Summary of Trade-offs

Advantages:

  • ✓ 2-4x lower per-element latency
  • ✓ 2-4x higher throughput
  • ✓ Better cache utilization
  • ✓ Fewer memory barriers
  • ✓ Same correctness guarantees

Disadvantages:

  • ✗ Increased complexity
  • ✗ Commit latency for partial batches
  • ✗ Requires batch-aware consumers
  • ✗ May need tuning for batch size
  • ✗ Slightly higher memory footprint

Use When:

  • ✓ High throughput required
  • ✓ Batch workloads natural
  • ✓ Can tolerate commit delay
  • ✓ SPSC pattern fits architecture
  • ✓ Memory barriers are bottleneck

Avoid When:

  • ✗ Single-element latency critical
  • ✗ Unpredictable arrival patterns
  • ✗ Team unfamiliar with batching
  • ✗ Simple queue is fast enough
  • ✗ Other bottlenecks dominate

Decision Matrix

ScenarioRecommendationBatch Size
HFT order processingBatched + timed commit16-32, 100μs timeout
Log aggregationBatched + large batches256-512
Real-time analyticsBatched + adaptive32-64, 1ms timeout
Network packet processingBatched + full batches64-128, no timeout
Database write-behindBatched + large batches512-1024
Game server eventsSingle operationN/A
GUI event handlingSingle operationN/A

When Batching Doesn't Help

Batching provides minimal benefit when:

  1. Throughput is already sufficient: If single-operation performance meets requirements, added complexity isn't justified

  2. Memory barriers aren't the bottleneck: Profile first. If time is spent in business logic, batching won't help

  3. Single-element latency is critical: Each uncommitted element adds latency. For strict latency SLAs, batching may not be suitable

  4. Arrival pattern is truly random: If elements arrive one at a time with significant gaps, batches never fill naturally


Part 12: Conclusion and Key Takeaways

The journey from 25ns to 5ns per element taught me something fundamental about performance optimization: the overhead often isn't where you expect it.

When I first profiled our queue, I assumed the bottleneck was data movement - copying references, updating indices, and checking bounds. The useful work was just 2-3 nanoseconds. The remaining 22 nanoseconds came from the fences that keep threads aligned.

The lesson is simple: fence cost stays roughly flat even as the protected batch grows. By batching our operations - accumulating multiple elements before a single publication - we amortized that fixed cost across many elements.

Key Takeaways

1. Memory Barriers Have Fixed Cost

A release/acquire barrier costs roughly 20-30 nanoseconds regardless of how much data it orders. Design your synchronization around this.

2. Batch Size Matters

Too small: you don't amortize barriers effectively. Too large: you add latency waiting for batches to fill. The sweet spot is typically 50-100 elements for throughput, 10-30 for latency-sensitive applications.

3. Shadow State Enables Batching

Separating local state (private to one thread) from published state (visible to other threads) is the key pattern. Advance local state freely; update published state only when committing.

4. Profile Before Optimizing

We only discovered memory barriers as the bottleneck through profiling. The fix is specific to this bottleneck - it wouldn't help if the issue were elsewhere.

5. Correctness Through Memory Ordering

Our batched queue maintains the same correctness guarantees as single-operation queues. Release-acquire semantics ensure that when the consumer sees the updated position, it also sees all the data that was written.

The Numbers

Queue TypePer-element LatencyThroughputMemory Barriers
Single Operation20-30ns~40M ops/sec2-3 per element
Batched (size 64)5-10ns~160M ops/sec1 per batch (64 elements)
Improvement2-4x2-4xN/A

Final Thoughts

The batched SPSC queue is now a core component of our trading infrastructure. It handles market data feeds where every nanosecond counts and log aggregation pipelines where throughput is king. The same fundamental optimization - amortizing barrier costs - applies in both cases.

But the real lesson isn't about batching specifically. It's about understanding where time actually goes. Modern hardware is incredibly fast at moving data but surprisingly slow at synchronization. When you find yourself constrained by synchronization overhead rather than actual work, batching is often the answer.

The next time you're optimizing a concurrent data structure, don't just focus on the algorithm. Look at the synchronization. Count the barriers. Measure their cost. And consider whether you can batch your way to better performance.

Sometimes the biggest wins come from doing less, not more - fewer barriers, fewer cache line transfers, fewer synchronization events. In the world of lock-free programming, less is often more.


Appendix A: Quick Reference

API Summary

// Creation
BatchedSPSCQueue<T> queue = new BatchedSPSCQueue<>(capacity);
BatchedSPSCQueue<T> queue = new BatchedSPSCQueue<>(capacity, batchSize);
 
// Single-element API (auto-batching)
boolean success = queue.offer(element);  // Add element, auto-commits at batch size
queue.commit();                          // Force publication of pending elements
T element = queue.poll();                // Remove and return element
 
// Batch API (explicit batching)
int added = queue.offerBatch(array, offset, count);   // Add multiple elements
int removed = queue.pollBatch(array, offset, count); // Remove multiple elements
int drained = queue.drain(consumer);                  // Drain all to consumer
int filled = queue.fill(supplier, maxCount);          // Fill from supplier
 
// Query
int size = queue.size();                 // Approximate size
boolean empty = queue.isEmpty();         // Check if empty
int pending = queue.pendingCount();      // Uncommitted elements

Optimal Batch Sizes

Use CaseRecommended Batch SizeCommit Strategy
Ultra-low latency8-16Timed (100μs)
Low latency16-32Timed (500μs)
Balanced32-64Timed (1ms)
High throughput128-256Size-based
Maximum throughput512-1024Size-based

Memory Ordering Summary

OperationSemanticsCost
offer() - writePlain~3ns
commit()Release~25ns
poll() - read headAcquire~15ns
poll() - write tailRelease~25ns

Per batch (64 elements):

  • Writes: 64 × 3ns = 192ns
  • Barriers: 1 × 25ns = 25ns
  • Total: 217ns / 64 = 3.4ns per element

Appendix B: Troubleshooting

Common Issues and Solutions

Problem: Elements not visible to consumer

Cause: Forgot to call commit()

Solution: Enable auto-commit or call commit() explicitly after writing

queue.offer(element);
queue.commit();  // Don't forget this!

Problem: Higher latency than expected

Cause: Batch size too large, waiting for batch to fill

Solution: Use smaller batch size or timed commit

// Option 1: Smaller batch
new BatchedSPSCQueue<>(capacity, 16);
 
// Option 2: Timed commit
new TimedBatchedQueue<>(capacity, 32, 100, TimeUnit.MICROSECONDS);

Problem: Lower throughput than expected

Cause: Batch size too small, too many barriers

Solution: Increase batch size, use explicit batch API

// Instead of:
for (T element : elements) {
    queue.offer(element);
}
 
// Use:
queue.offerBatch(elements, 0, elements.length);

Problem: Consumer seeing stale data

Cause: Race condition - likely bug in implementation

Solution: Verify memory ordering, check for missing acquire/release

// Verify these patterns:
PUBLISHED_HEAD.setRelease(this, localHead);  // After all writes
long head = (long) PUBLISHED_HEAD.getAcquire(this);  // Before any reads

Appendix C: Further Reading

Books

  • "The Art of Multiprocessor Programming" by Herlihy & Shavit - Definitive guide to lock-free algorithms
  • "Java Concurrency in Practice" by Goetz et al. - Java memory model and concurrent programming
  • "Is Parallel Programming Hard?" by Paul McKenney - Deep dive into memory ordering

Papers

  • "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" by Michael and Scott
  • "A Scalable, Correct Time-Stamped Stack" by Dodds et al.

Libraries

Online Resources


This article is part of the Lock-Free in Java series. The complete source code and benchmarks are available at https://github.com/techishthoughts-org/off_heap_algorithms


Appendix D: Comparison with Other Approaches

How Batching Compares to Other Optimizations

ApproachLatency ImprovementThroughput ImprovementComplexityUse Case
Batching (this article)2-4x2-4xMediumHigh-throughput SPSC
Per-slot sequences1.5-2x1.5-2xHighMPSC queues
Cache line padding1.2-1.5x1.2-1.5xLowAny concurrent structure
Thread affinity1.1-1.3x1.1-1.3xLowLatency-critical
NUMA optimization1.2-2x1.2-2xMediumMulti-socket systems

Batching provides the best improvement for SPSC scenarios because it directly addresses the dominant cost (memory barriers) rather than secondary factors.

Combining Optimizations

For maximum performance, combine batching with other techniques:

/**
 * Fully optimized batched queue combining all techniques.
 */
@Contended  // Prevent false sharing at object level
public class FullyOptimizedQueue<T> extends BatchedSPSCQueue<T> {
 
    // Thread affinity on construction
    private final int producerCore;
    private final int consumerCore;
 
    public FullyOptimizedQueue(int capacity, int batchSize,
                               int producerCore, int consumerCore) {
        super(capacity, batchSize);
        this.producerCore = producerCore;
        this.consumerCore = consumerCore;
    }
 
    /**
     * Call from producer thread to pin affinity.
     */
    public void initProducer() {
        AffinityHelper.pinToCore(producerCore);
    }
 
    /**
     * Call from consumer thread to pin affinity.
     */
    public void initConsumer() {
        AffinityHelper.pinToCore(consumerCore);
    }
}

When Each Approach Shines

Batching alone: When memory barriers are the bottleneck and you have bursty or continuous workloads.

Batching + affinity: When you need the lowest possible latency and can dedicate CPU cores.

Batching + NUMA: On multi-socket systems where memory locality matters.

All combined: For trading systems, network packet processing, and other ultra-low-latency applications.

The key is to measure first and apply optimizations that address your actual bottlenecks. Batching is often the highest-impact optimization for SPSC queues, but your mileage may vary based on workload characteristics.

Off-Heap Algorithms in JavaPart 10 of 12
Series Progress10 / 12