Monday 29 September 2014

JcTools based queues

Improving further

Following on my previous post in which I investigated the performance benefits in writing a custom executor as compared to OOTB -- comments from both Nitsan and Rudiger suggest I should try the JCTools implementation for SPMC queues (Single producer multiple consumer).

As simple as importing the jctools JARs from Maven, and a couple of lines to change the Queue implementation and away we go ....

    private final Queue<Runnable> queue = new SpmcArrayQueue((int) Math.pow(2, 18));

Without much further ado, here are the results. I've re-executed all of the numbers so they may be slightly different.



(Updated with correct 256K elements - most notable difference is that JCTools does not suffer as much from contention at ~8 threads).

LinkedTransferQueue is still an excellent candidate

I did notice that LinkedTransferQueue is still looking very respectable, so as an OOTB JVM queue it's definitely worth considering. Based on observing the system during the run, it does have a relatively high garbage cost. But, if you have memory to burn it is a very good candidate to test. In fact, it seems like the LTQ would perform much better if it didn't cause so much GC work. This is impressive given it is MPMC capable.

Actually, what I do notice when running on the LTQ is that the GC pause times recorded by the JVM are significantly longer (up to 1 sec). This is interesting and I suspect it's a result of the unbounded queue. A few combinations of capped queue lengths showed significantly reduced GC times based on the logs (<10millisec compared to 1sec), however no overall difference in runtime. I suspect there is more lurking in the dark here, for another day.

And custom is still better

The jctools SPMC queue does in fact perform significantly quicker (4-16%) than the other queues tested. As contention increased, it actually performed much better. Looks like Nitsan's work has paid off. See his video here (though I'm yet to get a chance to see it).

Perhaps it is a little like cheating, and we should pass Runnables to the Disruptor instead, but the Disruptor as written is still ~2x as fast as pushing this through than using an Executor.


Sunday 21 September 2014

Writing a non-blocking Executor

One of the blogs I follow is Rudiger's blog - in particular this article with a light benchmark I find interesting  as it shows the clear performance advantage of the Disruptor, and also just how slow the out of the box services can be for particular use cases.

In the article, Rudiger tests out a number of different implementations of a pi calculator, and finds quickly that 1million small tasks is a nice way to put enough pressure on the concurrency frameworks to split out a winner. The out of the box java Executor service is actually relatively slow, coming in last position.

One of the items I have on my to-do list is actually to build a fast ExecutorService implementation, and fortunately after a busy few months I've had a chance to sit down and write it. For those new to concurrency on Java, the ExecutorService interface and the Executors factory methods provide an easy way to get into launching threads and handling off composable tasks to other threads for asynchronous work.

The work below borrows heavily from Rudiger's work but also the work at 1024cores.net, and of course I can't neglect to mention Nitsan Wakart's blog and Martin Thompson's numerous talks on the topic.

The target: Get a working ExecutorService that can be used in the Pi calculator and get a result that is better than out-of-the-box.

Getting a baseline

Of course, since my target is to beat the out-of-the-box frameworks, I spent some time downloading the gists and hooking up the libs in maven. Only thing is I didn't hook up the Abstraktor tests as I am not too familiar with the project (happy to take a pull request).

Interestingly, the out of the box services perform pretty much as described in Rudiger's post, with the caveat that disruptor services have a much sharper degradation in performance on this run. Worth mentioning is that this is only a 2-socket 8-core E5410's (the old Harpertown series), so not nearly as new as Rudiger's kit.

First cut

I've heard good things about the LinkedTransferQueue but not had a need to test it out. As a first cut, I decided to write up a simple Executor implementation that as part of the submit() call.

    private static class LtqExecutor<T> implements ExecutorService {

        private volatile boolean running;
        private volatile boolean stopped;
        private final int threadCount;
        private final BlockingQueue<Runnable> queue;
        private final Thread[] threads;

        public LtqExecutor(int threadCount) {
            this.threadCount = threadCount;
            this.queue = new LinkedTransferQueue();
            running = true;
            stopped = false;
            threads = new Thread[threadCount];
            for(int i = 0; i < threadCount; i++) {
                threads[i] = new Thread(new Worker());
                threads[i].start();
            }
        }

        public void execute(Runnable runnable) {
            try {
                queue.put(runnable);
            } catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }

        private class Worker implements Runnable {
            public void run() {
                while(running) {
                    Runnable runnable = null;
                    try {
                        runnable = queue.take();
                    } catch (InterruptedException ie) {
                        // was interrupted - just go round the loop again,
                        // if running = false will cause it to exit
                    }
                    try {
                        if (runnable != null) { 
                            runnable.run();
                        }
                    } catch (Exception e) {
                        System.out.println("failed because of: " + e.toString());
                        e.printStackTrace();
                    }
                }
            }
        }

I also removed the test for getQueue.size(). Actually, performance of this was rather impressive on its own, achieving results in 1/3rd the time of the ootb services, while of course it comes nowhere near Disruptor.

threads disruptor2 threadpi custom / LTQ
1 954 1455 1111
2 498 1473 1318
3 327 2108 802
4 251 2135 762
5 203 2195 643
6 170 2167 581
7 7502 2259 653
8 9469 2444 5238

Trying some things

ArrayBlockingQueue and SynchronousQueue

I also tried substituting ABQ and SQ instead of LinkedTransferQueue, since my understanding is LTQ has some additional unnecessary overheads. Perhaps ABQ and SQ would be a little more cache friendly since ABQ is possibly better laid out in memory, and SQ as I've used it implementations before.

threads disruptor2 threadpi custom / LTQ custom / SQ custom / ABQ 1000
1 954 1455 1111 3738 2570
2 498 1473 1318 2638 2570
3 327 2108 802 3925 1875
4 251 2135 762 4035 1792
5 203 2195 643 4740 1692
6 170 2167 581 4635 1684
7 7502 2259 653 4362 1668
8 9469 2444 5238 4566 1677

Ouch. No need to mention these again.

Striping the LTQ

Clearly Disruptor's approach of limiting contended writes has a benefit. Would the LinkedTransferQueue get a little faster by striping the input queues - one per thread?

In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.

Actually, this was a bit better for some workloads but with much faster degradation.

threads disruptor2 threadpi custom / LTQ Custom / striped LTQ
1 954 1455 1111 1237
2 498 1473 1318 1127
3 327 2108 802 713
4 251 2135 762 570
5 203 2195 643 498
6 170 2167 581 386
7 7502 2259 653 5264
8 9469 2444 5238 5262

Customising a queue

A ringbuffer based queue

With a baseline set, I knew from reading Nitsan's many posts and the 1024cores.net site, I new there were some optimisations I could get from the problem space. What about using a queue with a ringbuffer?

By introducing a custom lock-free queue and substituting it in my Executor logic, using a put().

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    long toWrite = nextSlotToWrite.get();
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite % capacity)] = r;
                        nextSlotToWrite.incrementAndGet();
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

While the take() looks like this.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) % capacity);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                        }
                    } else {
                        // cas failed, do not sleep
                    }
                }
                throw new InterruptedException();
            }

Can we go faster by SPMC?

In particular, for our case, there is only ever a single writer thread issuing put()s to the queue. Can we take advantage of this by tweaking making the queue an SPMC queue (single producer many consumer)?

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite & REMAINDER_MASK)] = r;
                        toWrite += 1;
                        nextSlotToWrite.lazySet(toWrite);
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

Note that I use a lazySet() to write, but this doesn't help, I'm speculating that it's really the .get()'s in the below that are causing the issues.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                    }
                }
                throw new InterruptedException();
            }

By this point I've also started to use 2^ sized queues (following the disruptor guidance on fast modulo). Actually, this didn't make a lot of difference but I've left it in.

Caching the get()s

How can I reduce contention when things are busy? Having a look at the take() method we can see that there are two .get() calls to atomic fields, this will be causing volatile reads from memory. On CAS failure, it's not always the case that we need to re-read the variable. So what happens if we move that to only happen during the "things are too busy" loop?

            public Runnable take() throws InterruptedException {
                long lastFilled = nextSlotToWrite.get() - 1;
                long lastConsumedCache = lastConsumed.get();
                while(!Thread.interrupted()) {
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1);
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                            // LockSupport.parkNanos(1);

                            // note that the lastFilled does not need to change just yet
                            // but probably the lastConsumed did
                            lastConsumedCache = lastConsumed.get();
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                        lastFilled = nextSlotToWrite.get() - 1;
                        // lastConsumedCache = lastConsumed.get();
                    }
                }
                throw new InterruptedException();
            }
        }

threads akkapi disruptor disruptor2 threadpi custom / LTQ Custom Lockfree Pp Tp Padded Custom with cached gets
1 2389 1300 954 1455 1111 1245 1257
2 1477 713 498 1473 1318 992 1007
3 1266 492 327 2108 802 676 695
4 1629 423 251 2135 762 548 559
5 1528 400 203 2195 643 538 515
6 1541 398 170 2167 581 545 510
7 1482 1085 7502 2259 653 603 562
8 1397 1297 9469 2444 5238 925 883

Not a lot of difference, but it does help our under-contention numbers, which is exactly what we'd expect.

The results

Actually, the results were good, I was pretty happy with them. They're a small fraction faster than the LinkedTransferQueue with what seems like better worst case performance. Nowhere near as fast as the Disruptor but frankly that's not surprising, given the energy and talent working on it.


Of course, this is not Production ready, and YMMV but I would welcome comments and questions. Particularly if there are any issues with correctness.

Code samples on github.