At JCrete 2019, José Paumard gave a quiz intended to stump the Java experts. One of the questions that I got wrong had to do with
java.util.stream
collectors. I know how to use them, but I never bothered to look inside what makes them tick. Read along and find out.
When you use Java streams, you need a terminal operation to harvest the stream data. If you want elements and not just a summary result (such as count, sum, max), you call collect
:
Stream<String> words = ... var result = words .filter(...) .map(...) .collect(Collectors.toList());
The argument of collect
is an instance of java.util.stream.Collector
. The Collectors
class has static factories for many collectors, such as toList
, toSet
, and joining
.
A powerful collector is produced by Collectors.toMap
. You provide functions that produce the map key and value for an element. Here we make a map for looking up Person
objects by ID:
Map<Integer, Person> idToPerson = people.collect( Collectors.toMap(Person::getId, Function.identity()));
When collecting to a map, it could happen that there are multiple stream elements with the same key. The Collectors.groupingBy
method allows you to collect all elements with the same key. How do you collect them? With a collector, of course. Here is how to make a map from first names to people with the same first name.
Map<String, List<Person>> firstNameToPeople = people.collect( Collectors.groupingBy(Person::getFirstName, Collectors.toList()));
That second collector is a downstream collector.
A number of the Collectors
methods produce collectors that are only interesting for downstream collection. For example, Collectors.counting
simply counts elements:
Map<String, Long> firstNameCount = people.collect( Collectors.groupingBy(Person::getFirstName, Collectors.counting()));
You would never use that collector with Stream.collect
since there is already a Stream.count
method.
There are also Collectors.mapping
and Collectors.filtering
if you want to process the map values.
Map<String, Set<String>> countryToLanguages = locales.collect( groupingBy(Locale::getDisplayCountry, mapping(Locale::getDisplayLanguage, toSet())));
At JCrete 2019, José Paumard showed off a quiz with 20 questions that advanced Java programmers ought to be able to answer in less than a minute. He tested it at a reputable conference, and the highest score was 11. At JCrete, the summer playground of Java Champions and the like, it was no better—an embarrassing 11 as the high score, earned by yours truly. A Java version of the infamous Dictée de Mérimée!
I didn't even understand the question about collectors. It was something like:
Which of the following operations cannot be implemented as a collector? (a) Map (b) Filter (c) Peek (d) Sort.
I knew of the mapping
and filtering
collectors, was pretty sure there was no peeking
and couldn't remember whether there was sorting
. So I picked (c), not (d). Otherwise I would have beaten the attendees of the reputable conference.
Being a sore loser, I challenged the question. Surely sorting had to be doable since you can use binary search to insert into a sorted array and merge sorted arrays. Ok, not efficient, I know.
José admitted that but said: You can't use it with a downstream collector. Without a clue how to implement a collector that takes a downstream collector, I muttered something about an imprecise question and slunk away.
I have never had to write a collector from scratch, and the API looks forbidding. You construct a collector with four functions:
Name | Purpose | Type (if Java had function types) | Actual type |
---|---|---|---|
Supplier | Supplies an instance of the intermediate data structure that you use for collecting. | () -> A |
Supplier<A> |
Accumulator | Adds an element to that intermediate data structure | (A, T) -> () |
BiConsumer<A, T> |
Combiner | Merges two intermediate data structures | (A, A) -> A |
BinaryOperator<A> |
Finisher | Turns the final instance of the intermediate data structure into the result | A -> R |
Function<A, R> |
It's not as bad as it looks. Most of the time, we can ignore the finisher and build up the result that we want. Let's say we want to rewrite the counting
collector.
The intermediate and final data structure is simply a long
. Here is the collector:
var counting = Collector.of( () -> 0, // Supplier (a, t) -> a + 1, // Accumulator (a, b) -> a + b, // Combiner Function.identity()) // Finisher
The combiner seems unnecessary, but it is there to allow a caller of the collector to parallelize the collection process. Suppose we call stream.collect(counting)
on a long parallel stream. Then the stream is broken into segments. The elements of each segment are collected in parallel. Finally, the collections are combined.
This collector produces a single value, not another collection. If it did, you might want to send the results to a downstream collector. Then you have to work a bit harder. Here is the implementation of Collectors.filtering
from OpenJDK 11.
Only the elements that pass the predicate are accumulated downstream:
public static <T, A, R> Collector<T, ?, R> filtering(Predicate<? super T> predicate, Collector<? super T, A, R> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> { if (predicate.test(t)) { downstreamAccumulator.accept(r, t); } }, downstream.combiner(), downstream.finisher(), downstream.characteristics()); }
(Hold the question about downstream.characteristics()
for now.)
Now that we know the API, let's write a sorting collector. Clearly, we can't just pass the elements to the downstream accumulator as with filtering
since they need to be sorted first. The downstream passing must happen in the finisher. Here, I just accumulate into unsorted lists, then sort and pass on the sorted elements in the finisher.
Collector.of( () -> new ArrayList<T>(), // Supplier (a, e) -> { a.add(e); }, // Accumulator (a, b) -> { a.addAll(b); return a; }, // Combiner a -> { // Finisher a.sort(comp); var da = downstream.supplier().get(); for (T e : a) downstream.accumulator().accept(da, e); return downstream.finisher().apply(da); })
It works—see this complete program.
Exercise: For better parallel performance, use an intermediate data structure in which you have an already sorted list and an unsorted list. Accumulate into the unsorted list. When combining or finalizing, sort the unsorted lists and merge all of them together.
If you looked closely at the filtering
implementation, you may have noticed that it specified characteristics when constructing the collector. Characteristics are hints to the code that invokes a collector on how to use it more efficiently. A collector can set any or all of the following characteristics:
Name | Description |
---|---|
CONCURRENT |
The accumulator function can be called concurrently with the same accumulator. A caller may then choose to supply a single accumulator and concurrently add all elements to it, without combining. An example is a toConcurrentSet collector that accumulates all elements into a single concurrent set. |
UNORDERED |
Elements need not be accumulated in encounter order. |
IDENTITY_FINISH |
Nothing is done by the finish function, and it need not be called. |
Now we can rescue the quiz question:
Which of the following operations cannot be implemented as a collector with the IDENTITY_FINISH
characteristic? (a) Map (b) Filter (c) Peek (d) Sort
Exercise: Another operation that doesn't seem to work well with parallelization and downstream collectors is removing duplicates. However, if you don't care about the ordering, you can insert elements into a concurrent set as you find them, and only pass on new ones to the downstream collector. Implement a distincting
collector in this way. Pay attention to the characteristics, both as a producer of this collector and as a consumer of the downstream collector.
Comments powered by Talkyard.