Slide navigation: Forward with space bar, → arrow key, or PgDn. Backwards with ← or PgUp.

Copyright © Cay S. Horstmann 2016
Runnable interface describes a task that you want to run, usually concurrently with others:
public interface Runnable {
void run();
}run method is run in a thread.
Runnable task = () -> { ... };
Executor exec = ...;
exec.execute(task);Executors has factory methods for making executors:
Executor exec = Executors.newCachedThreadPool();
// Good for many short-lived tasks
int processors = Runtime.getRuntime().availableProcessors();
int nthreads = processors - 2;
Executor exec = Executors.newFixedThreadPool(nthreads);
// Good for computationally intensive tasks
Runnable hellos = () -> {
for (int i = 1; i <= 1000; i++) System.out.println("Hello " + i);
};
Runnable goodbyes = () -> {
for (int i = 1; i <= 1000; i++) System.out.println("Goodbye " + i);
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbyes);Goodbye 1 ... Goodbye 871 Goodbye 872 Hello 806 Hello 807 Goodbye 882 ... Hello 1000
Runnable, a Callable yields a result:
public interface Callable<V> {
V call() throws Exception;
}ExecutorService to execute a Callable:
ExecutorService exec = Executors.newFixedThreadPool(); Callable<V> task = ...; Future<V> result = exec.submit(task);
Future interface has these methods:
V get() throws InterruptedException, ExecutionException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
boolean isDone()
boolean cancel(boolean mayInterruptIfRunning)
boolean isCancelled()get blocks until the result is available.Set<Path> paths = ...;
List<Callable<Long>> tasks = new ArrayList<>();
for (Path p : paths) tasks.add(
() -> { return some value derived from contents of p });
List<Future<Long>> results = executor.invokeAll(tasks);
for (Future<Long> result : results) Process result.get();ExecutorCompletionService which returns futures in order of completion:
ExecutorCompletionService service = new ExecutorCompletionService(executor);
for (Callable<T> task : tasks) service.submit(task);
for (int i = 0; i < tasks.size(); i++) {
Process service.take().get()
Do something else
}
invokeAny method:
List<Callable<Path>> tasks = new ArrayList<>();
for (Path p : files) tasks.add(
() -> { if (word occurs in p) return p; else throw ... });
Path found = executor.invokeAny(tasks);private static boolean done = false;
Runnable hellos = () -> {
for (int i = 1; i <= 1000; i++) System.out.println("Hello " + i);
done = true;
};
Runnable goodbye = () -> {
int i = 1;
while (!done) i++;
System.out.println("Goodbye " + i);
};done = true; in one thread is not visible to the other thread!while (!done) i++;can be reordered as
if (!done) while (true) i++;
final variable is visible after initialization.static variable is visible after initialization.volatile variable are visible.private static volatile boolean done;
final modifier is your friend. Use it when appropriate for all fields.private static volatile int count = 0; ... count++; // Task 1 ... count++; // Task 2 ...
count++ is not atomic.register = count; Increment register count = register;
register1 = count; Increment register1 register2 = count; Increment register2 count = register2; count = register1;
Node n = new Node(); if (head == null) head = n; else tail.next = n; tail = n; tail.value = newValue;
String, java.time.ZonedDateTimeHashSet for aggregating results is dangerous:
results.addAll(newResult); // What if another thread accesses results?
addAll operation is complex and must be atomic.results = results.union(newResult);
final.
final.this escape in a constructor.
this to another method during construction.this be captured in an inner class during construction.long result = coll.parallelStream()
.filter(s -> s.startsWith("A")).count();urls.parallelStream().map(url -> Read contents of url)...
// Problematic——might starve global fork-join pool.
Arrays class has several methods that are parallelized for you.Arrays.parallelSort(words, Comparator.comparing(String::length));
Arrays.parallelSetAll(values, i -> i % 10);
long sum = IntStream.of(values).parallel().sum();
void time(Runnable task) {
long start = System.nanoTime();
task.run();
long end = System.nanoTime();
System.out.println((end - start) * 1E-9 + " seconds");
}
int SIZE = 100_000_000;
int[] values = new int[SIZE];
Arrays.parallelSetAll(values, i -> (int) (SIZE * Math.sin(i)));
time(() -> Arrays.parallelSort(values));
Arrays.parallelSetAll(values, i -> (int) (SIZE * Math.sin(i)));
time(() -> Arrays.sort(values));
java.util.concurrent package supplies ConcurrentHashMap and other concurrent data structures.ConcurrentModificationExceptionConcurrentHashMap won't be damaged by concurrent mutations.Long oldValue = map.get(word);
Long newValue = oldValue == null ? 1 : oldValue + 1;
map.put(word, newValue); // Error—might not replace oldValuemap.compute(word, (k, v) -> v == null ? 1 : v + 1);
computeIfPresent, computeIfAbsentputIfAbsent to atomically initialize a value:
map.putIfAbsent(word, 0L);
merge method lets you supply an initial key and an update operation:
map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue);
// Or simply map.merge(word, 1L, Long::sum);forEach, reduce, search, replaceAllArrayBlockingQueue and LinkedBlockingQueue are threadsafe implementations of bounded queues.add/remove methods throw an IllegalStateException when queue is full/empty.put/take methods block when queue is full/empty.offer/poll methods return false/null when queue is full/empty.
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
long and boolean values, object references, and arrays of these values.public static AtomicLong nextNumber = new AtomicLong(); // In some thread . . . long id = nextNumber.incrementAndGet();
public static AtomicLong sum = new AtomicLong(); // In some thread . . . sum.addAndGet(partialSum);
public static AtomicLong largest = new AtomicLong(); // In some thread . . . largest.set(Math.max(largest.get(), observed)); // Error—race condition!
updateAndGet or accumulateAndGet:
largest.updateAndGet(x -> Math.max(x, observed)); largest.accumulateAndGet(observed, Math::max);
AtomicLong can be inefficient under high contention.
LongAccumulator class solves this problem by splitting a counter or accumulator into multiple variables:
LongAccumulator accumulator = new LongAccumulator(Long::max, 0); // In some tasks . . . accumulator.accumulate(value); // When all work is done long largest = accumulator.get();
LongAdder if you just need a counter.lock before entering the critical section and unlock when leaving it:
Lock countLock = new ReentrantLock(); // Shared among multiple threads
int count; // Shared among multiple threads
...
countLock.lock();
try {
count++; // Critical section
} finally {
countLock.unlock(); // Make sure the lock is unlocked
}ReentrantLock, you can use intrinsic locks.synchronized statement locks and unlocks the intrinsic lock:
synchronized(obj) {
Critical section
}
obj.intrinsicLock.lock();
try {
Critical section
} finally {
obj.intrinsicLock.unlock();
}synchronized locks the body on this:
public synchronized void method() { body }public void method() {
synchronized(this) { body }
}Counter class:
public class Counter {
private int value;
public synchronized int increment() {
value++;
return value;
}
}synchronized (table) {
for (K key : table.keySet()) ...
}synchronized ("LOCK") {
for (K key : table.keySet()) ...
}public synchronized Object take() { // From a blocking queue
if (head == null) ...; // Can't proceed
Node n = head;
head = n.next;
return n.value;
}wait makes thread relinquish lock and enter a “waiting” state:
public synchronized Object take() { // From a blocking queue
while (head == null) wait();
...
}notifyAll to allow waiting threads to continue:
public synchronized void add(Object newValue) {
...
notifyAll();
}
Runnable task = () -> { ... };
Thread thread = new Thread(task);
thread.start();Thread.sleep(milliseconds);
run method ends (normally or due to an exception).thread.join(milliseconds);
thread.interrupt();
while (more work to do) {
if (Thread.currentThread().isInterrupted()) return;
Do more work
}InterruptedExceptionInterruptedException is thrown.try {
while (more work to do) {
Do more work
Thread.sleep(millis);
}
}
catch (InterruptedException ex) {
// Do nothing
}isInterrupted.
sleep throws an InterruptedException when it is called from an interrupted thread.NumberFormat is not threadsafe and should not be globally shared.NumberFormat instance for each thread.public static final ThreadLocal<NumberFormat> currencyFormat =
ThreadLocal.withInitial(() -> NumberFormat.getCurrencyInstance());String amountDue = currencyFormat.get().format(total);
get in a given thread, the lambda in withInitial is called.System.errthread.setDaemon(true) before starting the thread.JButton read = new JButton("Read");
read.addActionListener(event -> Read document from url);read.addActionListener(event -> Start new task that reads document from url);
EventQueue.invokeLater(() -> Update UI for progress or completion display);
SwingWorker/AsyncTask to manage event flow.CompletableFuture class provides an alternative.CompletableFuture<T> is a Future<T> which will deliver a value at some point in the future.CompletableFuture<T> implements the CompletionStage<T> interface, which allows completable futures to be composed.CompletableFuture:
public CompletableFuture<String> readPage(URL url) public List<URL> getImageURLs(String webpage) // Not time-consuming public CompletableFuture<List<BufferedImage>> getImages(List<URL> urls) public void saveImages(List<BufferedImage> images)
CompletableFuture.completedFuture(urlToProcess)
.thenComposeAsync(this::readPage, executor)
.thenApply(this::getImageURLs)
.thenCompose(this::getImages)
.thenAccept(this::saveImages);
CompletionException that wraps the original exception.exceptionally method allows you to substitute a value for an exception:
CompletableFuture.completedFuture(urlToProcess)
.thenComposeAsync(this::readPage, executor)
.exceptionally(ex -> "<html></html>")
.thenApply(this::getImageURLs)
handle with a BiFunction<T, Throwable, U> that receives either the result or the exception (and null for the other).CompletableFuture<T> future1 = ...; CompletableFuture<U> future2 = ...; CompletableFuture<V> combined = future1.thenCombine(future2, combiner);
combiner is a function taking arguments of type T and U, and producing a result of type V.CompletableFuture.allOf method takes an array or varargs sequence of completable futures and waits for all of them to complete, but it doesn't combine the results.applyToEither method:
CompletableFuture<T> future1 = ...;
CompletableFuture<T> future2 = ...;
CompletableFuture<U> combined = future1.applyToEither(future2, f);
// f maps T to U

CompletableFuture.completedFuture(urlToProcess)
.thenComposeAsync(this::readPage, executor)
.completeOnTimeout("<html></html>", 30, TimeUnit.SECONDS)
.thenApply(this::getImageURLs)
...
CompletableFuture.completedFuture(urlToProcess)
.thenComposeAsync(this::readPage, executor)
.orTimeout(30, TimeUnit.SECONDS)
.thenApply(this::getImageURLs)
...
ProcessBuilder to construct a Process object.Process with the ProcessBuilder ClassProcessBuilder builder = new ProcessBuilder("gcc", "myapp.c");
// You can also provide a List<String>builder = builder.directory(path.toFile());
Map<String, String> env = builder.environment();
env.put("LANG", "fr_FR");
env.remove("JAVA_HOME");builder.redirectIO()
builder.redirectInput(inputFile)
.redirectOutput(outputFile)
.redirectError(errorFile)builder.redirectErrorStream(true)
Process p = builder.start();
OutputStream processIn = p.getOutputStream(); InputStream processOut = p.getInputStream(); InputStream processErr = p.getErrorStream();
processIn, and read the output and errors from processOut and processErr.processIn is an OutputStream since your Java program writes to it and the process reads what your program wrote. int result = p.waitFor() int result = p.waitFor(delay, TimeUnit.SECONDS);
p.destroy(); // SIGTERM in Unix p.destroyForcibly(); // SIGKILL in Unix
import java.io.*;
ProcessBuilder builder = new ProcessBuilder("ls", "-al");
Process p = builder.directory(new File("/home/cay")).start();
InputStream processOut = p.getInputStream();
Scanner in = new Scanner(processOut);
while (in.hasNextLine()) System.out.println(in.nextLine());
p.waitFor()

onExit method yields a CompletableFuture<ProcessHandle> that completes when the process is completed:
Process process = new ProcessBuilder("/bin/ls").start();
process.onExit().thenAccept(h -> System.out.println(h + " all done"));
ProcessHandle by calling:
proc.toHandle() from a Process processProcessHandle.of(pid) with an OS PIDProcessHandle.current()ProcessHandle.allProcesses()long pid = handle.pid(); Optional<ProcessHandle> parent = handle.parent(); Stream<ProcessHandle> children = handle.children(); Stream<ProcessHandle> descendants = handle.descendants(); ProcessHandle.Info info = handle.info();
ProcessHandle.info yields command/commandLine/arguments/startInstant/totalCpuDuration/user if available.