The Coalescing Ring Buffer

The Coalescing Ring Buffer is the first component of the LMAX Collections Library we are open-sourcing today. It is a component that we have written in Java to efficiently buffer messages between a producer and a consumer thread where only the latest value for a given topic is of interest. All other messages can be discarded immediately.

The Problem of Market Data

Let’s imagine we are trying to write an automated trading system that listens to all stock price updates on an exchange in order to find under-valued stocks. The following diagram shows an exchange sending price updates for 3 stock symbols being queued for processing by our trading system.

marketData1

Deciding if the latest price for any stock is a good investment takes a certain amount of time. So it is quite possible that a price will change before the old price has been processed.

marketData2

Since we are only interested in the latest prices, considering Red Hat at 55 is a waste of memory and time. It would be better to update the already queued price of 55 to 56.

marketData3

This is the problem that the Coalescing Ring Buffer solves. It is a buffer that holds the incoming price updates and checks if it can update an existing value instead of growing the buffer before the consumer is ready to process them.

Great Engineers Steal

The key insight that makes the LMAX Disruptor so fast is an understanding of how modern CPUs are designed. Martin Thompson explains all this very well on his blog Mechanical Sympathy. I would very strongly recommend that anyone with an interest in designing high performance software, to read every article on his blog from the beginning.

The Coalescing Ring Buffer borrows the following design principles from the Disruptor:

  • using arrays as data structures because of their memory locality
  • using lock-free concurrency as it avoids kernel arbitration
  • using the single-writer principle to avoid cache line contention

I really just want to introduce the Coalescing Ring Buffer here but I will go into much more detail about its design and performance in future posts.

Ok Great, How Do I Use It?

First, download the Coalescing Ring Buffer jar and the source zip.

Then create a data class to represent the values we want the buffer to hold. In our example we will simply have a stock symbol and price:

public class StockPrice {
    public final String symbol;
    public final double price;

    public StockPrice(String symbol, double price) {
        this.symbol = symbol;
        this.price = price;
    }

    @Override
    public String toString() {
        return String.format("%s =\t$%.2f", symbol, price);
    }
}

Next create the buffer. The buffer takes two type arguments: the key and value types. Offering a value to the buffer with a key that equals another key already in the buffer causes the buffer to overwrite the existing value instead of growing larger.

CoalescingBuffer<String, StockPrice> buffer = new CoalescingRingBuffer<String, StockPrice>(8);

Please note that for performance reasons, the size of the buffer must be a power of 2 (more on this in future posts).

On the producer thread we can now offer the values as follows:

String symbol = "RHT";
StockPrice stockPrice = new StockPrice(symbol, 123.45);

boolean success = buffer.offer(symbol, stockPrice);

if (!success) {
	throw new AssertionError("offer of " + stockPrice + " failed");
}

Since the buffer is non-blocking, it signals an overflow by returning false. The correct way to handle the overflow condition depends on your application, but here we will simply throw an AsserionError.

On the consumer thread we collect values from the buffer in batches as follows:

Collection<StockPrice> prices = new ArrayList<StockPrice>();

while (!stop) {
	buffer.poll(prices);

	for (StockPrice price : prices) {
		System.out.println(price);
	}

	prices.clear();
}

The call to poll will transfer all StockPrices currently in the buffer to the prices collection and remove them from the buffer. If the buffer is empty, poll will return immediately without adding anything to the prices collection.

You can optionally specify the maximum number of items that should be transferred to the collection on each poll. Please note that the buffer only adds to the collection, it is the client’s responsibility to clear the buffer before subsequent calls to poll.

The full code for this example can be found here.

Current Limitations

Version 1.0 of the Coalescing Ring Buffer has the following limitations:

  • it supports only one producer and one consumer
  • it has no wait-strategy for the consumer thread
  • there is a small but non-zero probability that the consumer will see some duplicated values

If you get a chance to try it, please let me know what you think.

Great Engineers Definitely Steal Images

A special thanks to DesignContest, Deleket, Rob Sanders, fi3ur, and Fast Icon for allowing me to use their awesome icons!

About these ads

29 thoughts on “The Coalescing Ring Buffer

      • Great work! I’ve posted the link on twitter. Let me know when you are happy enough with the code and I’ll write a proper blog post with a link to the project.

        I’ve also committed a change this morning to make the lastCleaned index be non-volatile.

      • Thanks Nick, I pulled your change. We’ll see about those perf tests now…

    • Thanks! It can be used to pass messages directly without any coalescing but it does so more slowly than the disruptor. This is because it can’t use the lazy set trick.

  1. BTW, there was a request for something like this on Concurrency-Interest 2 years ago – “Concurrent indexed queue” (http://markmail.org/thread/wyrm44rzbfccx3cq#query:+page:1+mid:dbjf22a2qagdkbbl+state:results and http://markmail.org/thread/hzawjqfafdvsbo3v).

    I also noticed that the offer(xx) method does a linear scan of all the unread items to see if the key already exists. Isn’t that a little simplistic – meaning it would work (fast) only if the unread size is fairly small.

    • You are correct, a linear scan is only the fastest search technique up to a certain number of topics. In my experience, most messaging systems are dominated by only a few topics however. These topics will most likely to be found early on in the scan as they were the most likely to have arrived first.

      • It is interesting approach. We use similar component in our application, but develop it using OA hashmaps (for me it seems like primary idea). Have you tried such approach before choosing currently implemented one as default?

      • I benchmarked it against using a concurrent map but open addressing is an interesting idea that is definitely worth a try. I suspect that it could be faster than a linear scan if the mean linear scan distance is higher than some threshold. You would also have to do the same seeks again (or clear the map) if you did not want to violate the single writer principle and do the nulling out of consumed keys and values on the producer thread.

        It would also require the users to provide a good hashing algorithm for their keys. Have you published the source of your data structure?

  2. I will open source reworked version (ready for open source) of OA hash map soon.

    Anyway it is quite interesting – like a solar flare – for the last week that’s a third different approach to the same problem.

  3. Good stuff, it’s great to see more open source from LMAX.
    Couple of questions:
    1. Given it’s a single producer/consumer why can you not use lazySet everywhere instead of volatile set?
    2. Is there no chance of false-sharing between consumer and producer fields?
    3. It seems to me of rejectionCount and lastCleaned are only used from the producer and require no visibility, am I missing something?

    • Thanks for your feedback Nitsan:

      1. The problem is with updating existing values, you need to check that the consumer has not read it, update the value and then recheck that the consumer has still not read the value. This check-act-recheck sequence can’t use the lazy set (and is also the source of the possible duplicates). However, the part of the code that grows the buffer could use a lazy set. Thanks for the idea!

      2. I’ve tried to order the fields such that the risk of false-sharing is minimized. I am aware that Java 7 can re-order fields however. I’ve run the performance test using Martin Thompson’s PaddedAtomicLong instead but got no performance increase on Java 7. Perhaps I’ve missed something so feel free to try it yourself.

      3. The rejectionCount is there so that the buffer can be monitored via JMX and hence needs to be thread safe. This could easily be a lazy set though. There is no good reason why the lastCleaned count is volatile so thanks for pointing that out. I will fix it.

      • 1. I will have a closer look at that, I’d like to believe there is a possible implementation that side steps volatile access all together for a single producer/consumer.
        2. The way the compiler lays out your fields may mean this issue will be rare, But the following code:
        @Test
        public void test() throws NoSuchFieldException, SecurityException{
        long nextWriteOffset = UnsafeAccess.unsafe.objectFieldOffset(
        CoalescingRingBuffer.class.getDeclaredField(“nextWrite”));
        long lastReadOffset = UnsafeAccess.unsafe.objectFieldOffset(
        CoalescingRingBuffer.class.getDeclaredField(“lastRead”));
        System.out.println(“delta:”+(lastReadOffset – nextWriteOffset));
        }
        prints 32 on my machine, which suggests the fields are close enough together to suffer false sharing.
        My understanding is that the compiler will undo your intended layout to minimize the required padding of fields to achieve correct type alignment. Fields are sorted by type, see this blog post: http://www.codeinstructions.com/2008/12/java-objects-memory-structure.html for details, though it may be out of date by now.
        3. Since you have no ordering requirements on the rejectionCount field you can probably get away with doing a plain write to it as long as the read is volatile. The memory barriers made for other fields will ensure a fairly up to date value will be visible, which should satisfy your JMX monitoring requirement .

      • Thanks again for your feedback Nitsan. Yes let me know if you can solve problem 1. Thanks also for the code to check memory layout and the link. I will have another go to see if I can increase performance by reducing any false-sharing.

  4. I have a similar use case except instead of replacing the tick with the latest, I need to merge the latest into the existing tick. Basically I need to retain the best bid and ask. Do you think it’s possible to support this?

    • I’ve been thinking about that. What you basically want is a cache but it could be built on similar principles as the Coalescing Ring Buffer. Roughly how many different instruments are you talking about and would you expect some instruments to update more frequently than others?

      • Sorry I wasn’t clear enough. I meant to say can we enhance the Coalescing Ring Buffer so that we can keep the best bid and ask instead of simply replace with the last tick.
        So in CoalescingRingBuffer.java line 93, instead of
        values.set(index, value);
        we can pass in a “translator” function like in the Disruptor and the code can read
        values.set(index, translator(oldValue,value));

      • Ah I understand now. Yes that would be useful, so create another constructor that takes a translator interface to allow merging. The contract would have to be that you must not modify the original value in the translator but instead create a new value. I’d be happy to consider a pull request if you’d like to test and implement that?

    • Triple buffering deals with a situation where the producer is slower than the consumer and must not be slowed further by waiting for buffers to become available. The coalescing ring buffer deals with a situation where the producer is faster than the consumer and hence buffered values can be discarded before they are consumed because they are already out of date.

      • No, triple buffering can deal just as easily with the producer outpacing the consumer – it completely decouples the two. The producer simply alternates writing to the 2 buffers it has available while the consumer is busy with the 3rd. Obviously the consumer loses any updates overwritten by the producer, just like with your ring buffer. Best, triple buffering can be implemented as a CAS state machine in a couple dozen lines of code (in C at least).

      • I see, clearly my understanding of tipple buffering was wrong :-) How would it work when you have more than one key then? Would it scan the current write buffer just like the Coalescing Ring Buffer does to find the correct value to update?

      • I haven’t really given this any thought, but off the top of my head you might replace each buffer by a “sparse set” (http://research.swtch.com/sparse). That would give the producer O(1) buffer initialization and lookup, and the consumer insertion-order iteration. If you’re not dealing with keys which can be cleanly mapped to integers then this would be a bit more complicated and you might want to use some fast hash table (likely open-addressed). Deduping across buffers is not really an option, so the consumer would see more dups than in your approach.

  5. size() may return negative values, as the tail (nextWrite) is read before the head (lastRead) which can arbitrary increase between the 2 loads. The code should be: `return (int) (- lastRead.get() + nextWrite) – 1;`//-1 left as integer

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s