Tidbits #3 from JCrete 2022—Loom

.jpg

This is the last of three articles about tidbits that I learned at the JCrete unconference. Here is an elevator pitch about Loom for those who aren't familiar with it, followed by a couple of observations that shine some light on the essence of Loom.

Loom in a Nutshell

“Loom” is the nickname for a project that aims to bring virtual threads and structured concurrency to the Java platform.

A virtual thread is a thread that executes on a platform thread, but is “parked” when it is blocked. The platform thread is then free to execute other virtual threads. A parked thread resumes, perhaps on a different platform thread, when the blocking operation has resumed.

With virtual threads, blocking calls are cheap since they do not require a platform thread—a limited resource. You can have a large number of virtual threads, provided they mostly block. This workload is common for servers whose tasks spend much of their time waiting for results from databases and other services.

By making blocking calls cheap, Loom liberates programmers from the tyranny of an asynchronous programming style. An async library replaces sequential instructions and exception handling with method calls. Branches and loops become tedious and unintuitive. With Loom, you just use Java language constructs.

The API for virtual threads is straightforward since they are instances of java.lang.Thread. Executors are provided to run tasks on virtual threads. You'll see some examples below. I think they should be comprehensible to any Java programmer who is familiar with concurrent programming.

Structured concurrency is a programming style for managing the lifetime of concurrent tasks. Starting a concurrent tasks without worrying when it will finish is a bit like goto.

.svg

Structured programming replaced goto statements with branches and loops, which provide a bounded scope for the computation. Structured concurrency provides similar mechanisms for controlling concurrent tasks.

.svg

In summary, the purpose of project Loom is to facilitate more convenient and safer programming styles. Keep this in mind when you read yet another article that introduces Loom by showing you how to run a million virtual threads.

The UncaughtExceptionHandler

When an exception is thrown in the run method of a thread, and it is not caught, the thread terminates. To obtain the exception, you can install an UncaughtExceptionHandler into either the thread or the thread factory. Let's do it with the factory:

ThreadFactory factory = Thread.ofPlatform()
   .name("myPlatformFactory")
   .uncaughtExceptionHandler((thread, throwable) -> System.err.println(thread + " throws " + throwable))
   .factory();

Note the “fluent” API for threads and thread factories in JDK 19. The ofPlatform method yields a builder for platform threads or factories, and ofVirtual does the same for virtual threads. You configure the builder with methods such as name and uncaughtExceptionHandler. Finally you call either factory() to get a thread factory, or unstarted(runnable)/started(runnable) to get a thread.

Here is a thread from that factory. When it is terminated with an exception, the message is printed:

factory.newThread(() -> {
   System.out.println("Starting " + Thread.currentThread());
   LockSupport.parkNanos(1_000_000_000);
   throw new ThreadDeath();
}).start();

The LockSupport.parkNanos method puts the thread to sleep for the given number of nanoseconds. Unlike Thread.sleep, it does not declare InterruptedException as a checked exception. That makes it easier to write toy programs since you don't have to write a catch clause. (Another tidbit I learned at JCrete.)

Of course, it is uncommon to launch a single thread. More commonly, you use an executor service. Here is how to launch a bunch of virtual threads:

ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < NTASKS; i++) 
   service.submit(() -> {
      System.out.println("Starting " + Thread.currentThread());
      LockSupport.parkNanos(1_000_000_000);
      System.out.println("Completed " + Thread.currentThread());
   });

This is somewhat similar to executing threads on a thread pool, except that nothing is pooled. Each task gets a virtual thread. When the task completes, the virtual thread terminates.

Note that all tasks are started immediately. With a fixed thread pool, only a certain number of tasks would be launched initially, and the remaining ones would start whenever another thread completed. If those tasks compete for a limited resource, you need some other throttling mechanism rather than the pool size.

Heinz Kabutz demonstrated how one can customize virtual threads with a factory:

ThreadFactory factory = Thread.ofVirtual()
   .name("myVirtualFactory")
   .uncaughtExceptionHandler((thread, throwable) -> System.err.println(thread + " throws " + throwable))
   .factory();
ExecutorService service = Executors.newThreadPerTaskExecutor(factory);
for (int i = 0; i < NTASKS; i++) 
   service.submit(() -> {
      System.out.println("Starting " + Thread.currentThread());
      LockSupport.parkNanos(1_000_000_000);
      throw new ThreadDeath();
   });

I was really confused by this. Why didn't the UncaughtExceptionHandler kick in? There were no printouts of a virtual thread throwing a ThreadDeath.

This has nothing to do with virtual threads. Suppose the factory produced platform threads and we called Executors.newFixedThreadPool(POOL_SIZE, factory). Then the factory produces the pool threads. Each pool thread runs many tasks. Those tasks might throw uncaught exceptions. The executor service must catch those since submit returns a Future which contains the task's result upon completion, or the exception that terminates it. To harvest the latter, the pool thread must install its own UEH, replacing the one from the factory

Loom could have changed this for the “task per thread” executors, but they chose not to. The thread API has layers. Thread and Thread.UncaughtExceptionHandler are very low-level. ExecutorService and Future are at a higher level. When using an ExecutorService, use the Future API to determine a thread's outcome.

Structured concurrency provides an even higher-level API. More on that later.

Parallel Virtual Threads

When streams first appeared in Java 8, people used them for everything. And they stuck a parallel() in the stream pipeline because surely that would speed things up. But what if the lambdas in the stream pipeline made calls to a database or some other external service? Then the tasks would block, exhausting the fork-join pool. Java performance consultants made good money telling programmers not to do blocking calls in parallel streams.

But what about virtual threads? For them, blocking is cheap. What if I replaced the fork-join pool with a virtual thread per task executor?

There is no official API for specifying the pool for parallel streams, but it is widely known that you can specify the pool like this:

var pool = ...;
var future = pool.submit(() -> stream.parallel().map(...).collect(...));

Let's try that with virtual threads:

ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();
Future<List<String>> future = service.submit(() -> IntStream.range(0, NTASKS).parallel()
   .mapToObj(i -> get("https://horstmann.com/random/word")).toList())

This collects random words from the delightfully slow service at https://horstmann.com/random/word, using the following blocking method:

public static String get(String url) {
   try {
      var request = HttpRequest.newBuilder().uri(new URI(url)).GET().build();
      var response = client.send(request, HttpResponse.BodyHandlers.ofString());
      var status = response.statusCode();
      return Thread.currentThread() + " " + (status == 200 ? response.body() : status);
   } catch (Exception ex) {
      var rex = new RuntimeException();
      rex.initCause(ex);
      throw rex;
   }
}

So what if it blocks, right? Blocking is cheap.

This was a complete failure. Throughput was abysmal, and looking at the thread IDs in the returned strings, it was clear that only a few calls to get used a virtual thread. The remainder used a fork-join pool. Surely the global one, since there is none other in sight.

Why didn't it just spawn NTASKS virtual threads? Well, why should it? Parallel streams never worked that way. They split the data structure (here, the range from 0 to NTASKS) into chunks, and run each chunk as a separate task, using the provided pool. If tasks are further split, and they run in a thread from a fork-join pool, then they again get added to that pool. If not, they get added to the global fork-join pool.

Parallel streams are meant for computationally intensive tasks that consume values in a splittable data structure. The fork-join tool is optimized for assigning the work to the available processors. This has nothing to do with running many blocking tasks concurrently.

If we want to use streams for this problem, parallel streams are not the answer. Instead, submit the tasks and then process the resulting futures. Let's for a brief moment focus on the happy day scenario that all tasks succeed:

List<String> result = IntStream.range(0, NTASKS).
   .mapToObj(i -> service.submit(() -> get("https://horstmann.com/random/word")))
   .map(f -> f.get())
   .toList();

Actually, that won't work since Future.get throws checked exceptions. In fact, you obtain an unhandled exception from a task as the cause of the ExecutionException that the get method throws.

A structured task scope, Loom's mechanism for structured concurrency, gives you control over the tasks, making it much easier to harvest the results.

Here is the basic usage from the JavaDoc:

try (var scope = new StructuredTaskScope<String>()) {
   Future<Integer> future1 = scope.fork(task1);
   Future<String> future2 = scope.fork(task2);

   scope.join();

   // ... process results/exceptions ...

} // close

The join method blocks until all tasks have completed. In practice, you would want to use joinUntil with a deadline since you probably don't want to wait indefinitely.

In our case, we can collect the futures in a stream, and harvest the results, when all are available. Remember, blocking is cheap..

try (var scope = new StructuredTaskScope<Object>()) {
   Stream<Future<String>> futures = IntStream.range(0, NTASKS).
      .mapToObj(i -> scope.submit(() -> get("https://horstmann.com/random/word")));
   scope.join();
   List<String> results = futures.filter(f -> f.state() == Future.State.SUCCESS)
      .map(Future::resultNow)
      .toList();
   return results;
} // close

The state and resultNow have been added to the Future interface in JDK 19 to make this harvesting easy.

Task Scope Policies

In the example of the preceding section, we skipped any tasks that did not succeed? Is that reasonable? It's hard to tell from a toy example. Was it our goal to get as many random words as we could? To get NTASKS random words? In case of failure, do we care about analyzing the exceptions?

One can customize the StructuredTaskScope by extending it and overriding the handleComplete method. Here is an example by José Paumard, a scope that collects both results and exceptions:

package org.paumard.scope;

import jdk.incubator.concurrent.StructuredTaskScope;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.stream.Collector;

public class CollectingScope<T, R> extends StructuredTaskScope<T> {

    private Collector<T, ?, R> collector;

    private final Collection<T> results =
          new ConcurrentLinkedQueue<>();
    private final Collection<Throwable> exceptions =
          new ConcurrentLinkedQueue<>();

    public CollectingScope(Collector<T, ?, R> collector) {
        this.collector = collector;
    }

    @Override
    protected void handleComplete(Future<T> future) {
        switch (future.state()) {
            case RUNNING -> throw new IllegalStateException("Future is still running...");
            case SUCCESS -> this.results.add(future.resultNow());
            case FAILED -> this.exceptions.add(future.exceptionNow());
            case CANCELLED -> {
            }
        }
    }

    public Exception exceptions() {
        Exception exception = new Exception();
        this.exceptions.forEach(exception::addSuppressed);
        return exception;
    }

    public R result() {
        return this.results.stream().collect(collector);
    }
}

The JDK provides two policies that cover common use cases: StructuredTaskScope.ShutdownOnFailure and StructuredTaskScope.ShutdownOnSuccess. These are the equivalents to the invokeAll and invokeAny methods of the ExecutorService interface, but they are more convenient to use. They are well described in the API docs.

For example, if you want to make a user name from an adjective and a noun, you want both calls to succeed, like this:

Instant deadline = ...

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
   var adjective = scope.fork(() -> get("https://horstmann.com/random/adjective"));
   var noun = scope.fork(() -> get("https://horstmann.com/random/noun"));

   scope.joinUntil(deadline);

   scope.throwIfFailed(); // Throws the exception of the first failure
   // Both subtasks completed successfully
   return adjecive.resultNow() + "-" + noun.resultNow();
}

José doesn't love the fact that the business logic is intermingled with the mechanics of the scope (fork, join, failure handling). The complexity arises from concurrency. If we were content to make the web requests sequentially, the code would become trivial:

var adjective = get("https://horstmann.com/random/adjective");
var noun = get("https://horstmann.com/random/noun");
return adjective + "-" + noun;

Should there be a language construct for concurrent execution? That might be tempting in the long run, but for now the best we can do is to provide policies for common patterns.

ThreadLocalRandom Isn't a ThreadLocal

A ThreadLocal variable is a “variable” that can have a different value in each thread. These are sometimes used to make objects available to all methods that collaborate on a task, without having to pass the object from one caller to another. Suppose for example, you want to share a database connection. Declare a variable

public static final ThreadLocal<Connection> connection = new ThreadLocal<>();

When the task starts, initialize the connection for this thread:

connection.set(connect(url, username, password));

The task calls some methods, all within the same thread, and eventually one of them needs the connection:

var result = connection.get().executeQuery(query);

Note that the same call may happen on multiple threads. Each of them gets its own connection object.

ThreadLocals are actually a better match for Loom than for traditional thread pools, since in Loom, there is a 1 : 1 correspondence between threads and tasks. Unfortunately, thread locals are fairly expensive and not suitable for large numbers of virtual threads. Loom has a replacement, ExtentLocal, with a similar API. Extent locals are immutable and can be shared in forked threads.

I said as much in my Loom overview presentation, and mumbled, Thread locals, you know, for database connections, or non-threadsafe services like SimpleDateFormat, or ThreadLocalRandom, when Heinz Kabutz said firmly: ThreadLocalRandom does not use thread locals.

I never knew that. I just assumed from the name that it would. And it may at one point have been true. But now ThreadLocalRandom actually stores some state directly in the Thread object—see this StackOverflow discussion.

Let's check that it works with virtual threads.

public static double randomAverage(RandomGenerator generator, int nthreads, int nrandoms)
      throws InterruptedException, ExecutionException {
   var results = new ArrayList<Future<Double>>();
   try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
      results.add(scope.fork(() -> {
         double sum = 0;
         for (int j = 0; j < nrandoms; j++) {
            sum += (generator != null ? generator : ThreadLocalRandom.current()).nextDouble();
         }
         return sum;
      }));
   }
   double avg = 0;
   for (var f : results) avg += f.resultNow();
   return avg / nthreads / nrandoms;
}

On my machine, calling

randomAverage(new Random(), 10_000, 10_000)

takes 16 seconds, but using ThreadLocalRandom only 1 second.

Of course, this would be a stupid use of virtual threads. The tasks are CPU intensive and don't block.

And those are the two takeaways about Loom:

  1. Virtual threads are suitable for workloads with many concurrent tasks that mostly block. They
  2. With structured concurrency, you can use the program structure to reason about the scope of subtasks.

Comments powered by Talkyard.