Down the Collector Rabbit Hole

At JCrete 2019, José Paumard gave a quiz intended to stump the Java experts. One of the questions that I got wrong had to do with java.util.stream collectors. I know how to use them, but I never bothered to look inside what makes them tick. Read along and find out.

Collector Basics

When you use Java streams, you need a terminal operation to harvest the stream data. If you want elements and not just a summary result (such as count, sum, max), you call collect:

Stream<String> words = ...
var result = words
   .filter(...)
   .map(...)
   .collect(Collectors.toList());

The argument of collect is an instance of java.util.stream.Collector. The Collectors class has static factories for many collectors, such as toList, toSet, and joining.

A powerful collector is produced by Collectors.toMap. You provide functions that produce the map key and value for an element. Here we make a map for looking up Person objects by ID:

Map<Integer, Person> idToPerson = people.collect(
   Collectors.toMap(Person::getId, Function.identity()));

Downstream Collectors

.jpeg

When collecting to a map, it could happen that there are multiple stream elements with the same key. The Collectors.groupingBy method allows you to collect all elements with the same key. How do you collect them? With a collector, of course. Here is how to make a map from first names to people with the same first name.

Map<String, List<Person>> firstNameToPeople = people.collect(
   Collectors.groupingBy(Person::getFirstName, Collectors.toList()));

That second collector is a downstream collector.

A number of the Collectors methods produce collectors that are only interesting for downstream collection. For example, Collectors.counting simply counts elements:

Map<String, Long> firstNameCount = people.collect(
   Collectors.groupingBy(Person::getFirstName, Collectors.counting()));

You would never use that collector with Stream.collect since there is already a Stream.count method.

There are also Collectors.mapping and Collectors.filtering if you want to process the map values.

Map<String, Set<String>> countryToLanguages = locales.collect(
   groupingBy(Locale::getDisplayCountry,
      mapping(Locale::getDisplayLanguage,
         toSet())));

The JCrete Question

At JCrete 2019, José Paumard showed off a quiz with 20 questions that advanced Java programmers ought to be able to answer in less than a minute. He tested it at a reputable conference, and the highest score was 11. At JCrete, the summer playground of Java Champions and the like, it was no better—an embarrassing 11 as the high score, earned by yours truly. A Java version of the infamous Dictée de Mérimée!

I didn't even understand the question about collectors. It was something like:

Which of the following operations cannot be implemented as a collector? (a) Map (b) Filter (c) Peek (d) Sort.

I knew of the mapping and filtering collectors, was pretty sure there was no peeking and couldn't remember whether there was sorting. So I picked (c), not (d). Otherwise I would have beaten the attendees of the reputable conference.

Being a sore loser, I challenged the question. Surely sorting had to be doable since you can use binary search to insert into a sorted array and merge sorted arrays. Ok, not efficient, I know.

José admitted that but said: You can't use it with a downstream collector. Without a clue how to implement a collector that takes a downstream collector, I muttered something about an imprecise question and slunk away.

Writing A Collector

I have never had to write a collector from scratch, and the API looks forbidding. You construct a collector with four functions:

Name Purpose Type (if Java had function types) Actual type
Supplier Supplies an instance of the intermediate data structure that you use for collecting. () -> A Supplier<A>
Accumulator Adds an element to that intermediate data structure (A, T) -> () BiConsumer<A, T>
Combiner Merges two intermediate data structures (A, A) -> A BinaryOperator<A>
Finisher Turns the final instance of the intermediate data structure into the result A -> R Function<A, R>

It's not as bad as it looks. Most of the time, we can ignore the finisher and build up the result that we want. Let's say we want to rewrite the counting collector.

The intermediate and final data structure is simply a long. Here is the collector:

var counting = Collector.of(
   () -> 0, // Supplier
   (a, t) -> a + 1, // Accumulator
   (a, b) -> a + b, // Combiner
   Function.identity()) // Finisher

The combiner seems unnecessary, but it is there to allow a caller of the collector to parallelize the collection process. Suppose we call stream.collect(counting) on a long parallel stream. Then the stream is broken into segments. The elements of each segment are collected in parallel. Finally, the collections are combined.

This collector produces a single value, not another collection. If it did, you might want to send the results to a downstream collector. Then you have to work a bit harder. Here is the implementation of Collectors.filtering from OpenJDK 11.

Only the elements that pass the predicate are accumulated downstream:

public static <T, A, R>
Collector<T, ?, R> filtering(Predicate<? super T> predicate,
      Collector<? super T, A, R> downstream) {
   BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
   return new CollectorImpl<>(downstream.supplier(),
      (r, t) -> {
         if (predicate.test(t)) {
            downstreamAccumulator.accept(r, t);
         }
      },
      downstream.combiner(),
      downstream.finisher(),
      downstream.characteristics());
}

(Hold the question about downstream.characteristics() for now.)

A Sorting Collector

Now that we know the API, let's write a sorting collector. Clearly, we can't just pass the elements to the downstream accumulator as with filtering since they need to be sorted first. The downstream passing must happen in the finisher. Here, I just accumulate into unsorted lists, then sort and pass on the sorted elements in the finisher.

Collector.of(
   () -> new ArrayList<T>(), // Supplier 
   (a, e) -> { a.add(e); }, // Accumulator
   (a, b) -> { a.addAll(b); return a; }, // Combiner
   a -> { // Finisher
      a.sort(comp);
      var da = downstream.supplier().get();
      for (T e : a) downstream.accumulator().accept(da, e);
      return downstream.finisher().apply(da);
   })

It works—see this complete program.

Exercise: For better parallel performance, use an intermediate data structure in which you have an already sorted list and an unsorted list. Accumulate into the unsorted list. When combining or finalizing, sort the unsorted lists and merge all of them together.

Characteristics

If you looked closely at the filtering implementation, you may have noticed that it specified characteristics when constructing the collector. Characteristics are hints to the code that invokes a collector on how to use it more efficiently. A collector can set any or all of the following characteristics:

Name Description
CONCURRENT The accumulator function can be called concurrently with the same accumulator. A caller may then choose to supply a single accumulator and concurrently add all elements to it, without combining. An example is a toConcurrentSet collector that accumulates all elements into a single concurrent set.
UNORDERED Elements need not be accumulated in encounter order.
IDENTITY_FINISH Nothing is done by the finish function, and it need not be called.

Now we can rescue the quiz question:

Which of the following operations cannot be implemented as a collector with the IDENTITY_FINISH characteristic? (a) Map (b) Filter (c) Peek (d) Sort

Exercise: Another operation that doesn't seem to work well with parallelization and downstream collectors is removing duplicates. However, if you don't care about the ordering, you can insert elements into a concurrent set as you find them, and only pass on new ones to the downstream collector. Implement a distincting collector in this way. Pay attention to the characteristics, both as a producer of this collector and as a consumer of the downstream collector.

Comments powered by Talkyard.