If we have a List<Stream<T>>, each stream with sorted elements, how can we generate a sorted Stream<T>, taking one at a time from each strea
image

[289] MergingSortedSpliterator

Author: Dr Heinz M. Kabutz | Date: 2021-04-03 | Category: Tips and Tricks | Java Version: 11+ | Read Online


Abstract:
If we have a List<Stream<T>>, each stream with sorted elements, how can we generate a sorted Stream<T>, taking one at a time from each stream? In this newsletter we show how to do that with the Stream API and by writing our own MergingSortedSpliterator.




Welcome to the 289th edition of The Java(tm) Specialists' Newsletter. I know, I know, 36 hours ago I sent you a newsletter after two months of radio silence, and here is another one? Well, when I see an interesting puzzle, I find myself unable to resist trying to solve it. And that's what happened this morning. So here we are - I hope you enjoy it :-)

We have three upcoming LIVE virtual classes in April and May 2021:

  1. Refactoring to Streams and Lambdas for US$ 497 on April 6-7 2021 @ 9am-1pm Frankfurt Time.
  2. Extreme Java - Advanced Topics Java 17 Edition for EUR 1299 on April 19-20 2021 @ 9am-5pm Frankfurt Time. (almost sold out)
  3. Design Patterns Deep Dive for US$ 497 on May 11-12 2021 @ 7-11am Los Angeles Time.

My favourite course at the moment is the Refactoring to Streams and Lambdas course. We spend 8 hours ripping apart a 330k LOC application and replacing bits with more modern code. Too much fun! We still have a few places available for next week. The Advanced Topics Course is also very interesting. It is almost sold out though, so please grab the seats if you would like to join.


MergingSortedSpliterator


As I rubbed the sleep out of my eyes this morning, I vaguely made out the shape of a tweet with my name on it. Antoine DESSAIGNE (@adessaigne) had tweeted "Hello @java folks , If you have a List<Stream<T>>, each stream with sorted elements, how can you generate a sorted Stream<T>, taking one at a time from each stream? Managed to do that using iterators but is there a clever way to do it with Stream API? Thanks "

None of the Java greats had proposed an easy or performant solution, so I grabbed my laptop and started typing. Antoine had apparently created his own solution using Iterators, and I had in mind a solution using Spliterators. A Spliterator can easily be converted to a Stream using StreamSupport.stream(). Having not seen Antoine's solution, I imagine that he did something along the same lines. Obviously we could just do:

 list.stream() .flatMap(Function.identity()) .sorted(); 

In that case we would need to sort twice. I wanted to avoid that. However, the second sort would not be as expensive as the first. Timsort is efficient at sorting partially sorted lists. In addition, if we also parallelize the second stream sorting, then internally it will do a divide and conquer. There is a good chance that we would end up with chunks of items that are already mostly sorted.

But more fun is to write our own MergingSortedSpliterator. Some constraints are: the streams that are passed in have to have the sorted characteristic, which also implies that their Spliterator should return the Comparator used for sorting. All the streams must have used the same Comparator. The Comparator does not have to be the same instance, but they must match equals(). Another constraint is that this Spliterator does not support parallel streams. The characteristics are an AND of all the contained spliterators and the estimatedSize is a sum of the spliterators, using Long.MAX_VALUE if we overflow.

 import java.util.*; import java.util.function.*; import java.util.stream.*; public class MergingSortedSpliterator<T> implements Spliterator<T> { private final List<Spliterator<T>> spliterators; private final List<Iterator<T>> iterators; private final int characteristics; private final Object[] nextItem; private final static Object END_OF_STREAM = new Object(); private final Comparator<? super T> comparator; public MergingSortedSpliterator(Collection<Stream<T>> streams) { this.spliterators = streams.stream() .map(Stream::spliterator) .collect(Collectors.toList()); if (!spliterators.stream().allMatch( spliterator -> spliterator.hasCharacteristics(SORTED))) throw new IllegalArgumentException("Streams must be sorted"); Comparator<? super T> comparator = spliterators.stream() .map(Spliterator::getComparator) .reduce(null, (a, b) -> { if (Objects.equals(a, b)) return a; else throw new IllegalArgumentException( "Mismatching comparators " + a + " and " + b); }); this.comparator = Objects.requireNonNullElse(comparator, (Comparator<? super T>) Comparator.naturalOrder()); this.characteristics = spliterators.stream() .mapToInt(Spliterator::characteristics) .reduce((ch1, ch2) -> ch1 & ch2) .orElse(0); // setting up iterators this.iterators = spliterators.stream() .map(Spliterators::iterator) .collect(Collectors.toList()); nextItem = new Object[streams.size()]; int index = 0; for (Iterator<T> iterator : iterators) { nextItem[index++] = fetchNext(iterator); } } private Object fetchNext(Iterator<T> iterator) { return iterator.hasNext() ? iterator.next() : END_OF_STREAM; } public boolean tryAdvance(Consumer<? super T> action) { Objects.requireNonNull(action, "action==null"); if (nextItem.length == 0) return false; T smallest = null; int smallestIndex = -1; for (int i = 0; i < nextItem.length; i++) { Object o = nextItem[i]; if (o != END_OF_STREAM) { T t = (T) o; if (smallest == null || comparator.compare(t, smallest) < 0) { smallest = t; smallestIndex = i; } } } // smallest might be null if the stream contains nulls if (smallestIndex == -1) return false; nextItem[smallestIndex] = fetchNext(iterators.get(smallestIndex)); action.accept(smallest); return true; } public Spliterator<T> trySplit() { // never split - parallel not supported return null; } public long estimateSize() { return spliterators.stream() .mapToLong(Spliterator::estimateSize) .reduce((ch1, ch2) -> { long result; if ((result = ch1 + ch2) < 0) result = Long.MAX_VALUE; return result; }) .orElse(0); } public int characteristics() { return characteristics; } public Comparator<? super T> getComparator() { return comparator; } } 

We can then create a Stream from the Spliterator using StreamSupport.stream(new MergingSortedSpliterator<>(streams), false). The false means that the stream will be sequential and not parallel. As mentioned above, parallel is not supported.

Here is a demo of the MergingSortedSpliterator at work:

 import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class SortedStreamOfSortedStreams { private static final int SIZE = 5; public static void main(String... args) { List<Stream<Integer>> streams = List.of( generateSortedRandom(SIZE), generateSortedRandom(SIZE), generateSortedRandom(SIZE), generateSortedRandom(SIZE) ); Stream<Integer> numbers = StreamSupport.stream( new MergingSortedSpliterator<>(streams), false ); numbers.forEach(System.out::println); } private static Stream<Integer> generateSortedRandom(int size) { return ThreadLocalRandom.current().ints(size, 0, size * 4) .parallel() .sorted() .boxed(); } } 

For example, we might see output like this:

 0 0 2 4 4 5 6 6 7 10 10 11 12 15 16 17 18 18 19 19 

I ran some performance tests and as I expected, my MergingSortedSpliterator is faster than a sorted flatMap. However, even faster, at least on my machine, is a parallel sorted flatMap. Here is a basic performance test:

 import java.util.*; import java.util.function.*; import java.util.stream.*; public class PerformanceTest { private static final int SIZE = 10_000_000; private static final List<Function<List<Stream<Integer>>, Stream<Integer>>> MERGERS = List.of( s -> s.stream() .flatMap(Function.identity()) .sorted(), s -> s.stream() .flatMap(Function.identity()) .parallel() .sorted(), s -> StreamSupport.stream( new MergingSortedSpliterator<>(s), false )); public static void main(String... args) { for (int i = 0; i < 10; i++) { test(); System.out.println(); } } private static void test() { MERGERS.forEach(merger -> { List<Stream<Integer>> streams = makeStreams(); long time = System.nanoTime(); try { Stream<Integer> numbers = merger.apply(streams); numbers.forEach(i -> { }); } finally { time = System.nanoTime() - time; System.out.printf("time = %dms%n", (time / 1_000_000)); } }); } private static List<Stream<Integer>> makeStreams() { return Stream.generate(() -> generateSorted(SIZE)) .limit(10).collect(Collectors.toList()); } private static Stream<Integer> generateSorted(int size) { return IntStream.range(0, size).boxed(); } } 

On my 1-8-2 MacBook Pro the best results were:

 Sorted flatMap sequential: 7.8 seconds Sorted flatMap parallel: 2.1 seconds MergingSortedSpliterator: 5.2 seconds 

Thus the MergingSortedSpliterator would be faster than the sequential version and require less CPU cycles than the parallel version. Unless I was working with very large datasets, I would probably favour the sorted flatMap sequential version. Only catch is that we should make sure that the comparator is the same as for the original streams. My MergingSortedSpliterator takes care of that for us.

Thanks Antoine for the challenge. I enjoy solving puzzles like this, especially if they have some practical application and are not just interview questions or homework assignments :-)

Kind regards

Heinz

Java Specialists Superpack 21
Our entire Java Specialists Training in One Huge Bundle
Superpack 21


If you no longer wish to receive our emails, click the link below:
 Unsubscribe

Cretesoft Limited 77 Strovolos Ave Strovolos, Lefkosia 2018 Cyprus