Slide navigation: Forward with space bar, → arrow key, or PgDn. Backwards with ← or PgUp.
Copyright © Cay S. Horstmann 2016
Runnable
interface describes a task that you want to run, usually concurrently with others:
public interface Runnable { void run(); }
run
method is run in a thread.
Runnable task = () -> { ... }; Executor exec = ...; exec.execute(task);
Executors
has factory methods for making executors:
Executor exec = Executors.newCachedThreadPool(); // Good for many short-lived tasks int processors = Runtime.getRuntime().availableProcessors(); int nthreads = processors - 2; Executor exec = Executors.newFixedThreadPool(nthreads); // Good for computationally intensive tasks
Runnable hellos = () -> { for (int i = 1; i <= 1000; i++) System.out.println("Hello " + i); }; Runnable goodbyes = () -> { for (int i = 1; i <= 1000; i++) System.out.println("Goodbye " + i); }; Executor executor = Executors.newCachedThreadPool(); executor.execute(hellos); executor.execute(goodbyes);
Goodbye 1 ... Goodbye 871 Goodbye 872 Hello 806 Hello 807 Goodbye 882 ... Hello 1000
Runnable
, a Callable
yields a result:
public interface Callable<V> { V call() throws Exception; }
ExecutorService
to execute a Callable
:
ExecutorService exec = Executors.newFixedThreadPool(); Callable<V> task = ...; Future<V> result = exec.submit(task);
Future
interface has these methods:
V get() throws InterruptedException, ExecutionException V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; boolean isDone() boolean cancel(boolean mayInterruptIfRunning) boolean isCancelled()
get
blocks until the result is available.Set<Path> paths = ...;
List<Callable<Long>> tasks = new ArrayList<>();
for (Path p : paths) tasks.add(
() -> { return some value derived from contents of p
});
List<Future<Long>> results = executor.invokeAll(tasks);
for (Future<Long> result : results) Process result.get();
ExecutorCompletionService
which returns futures in order of completion:
ExecutorCompletionService service = new ExecutorCompletionService(executor); for (Callable<T> task : tasks) service.submit(task); for (int i = 0; i < tasks.size(); i++) { Process service.take().get() Do something else }
invokeAny
method:
List<Callable<Path>> tasks = new ArrayList<>(); for (Path p : files) tasks.add( () -> { if (word occurs in p) return p; else throw ... }); Path found = executor.invokeAny(tasks);
private static boolean done = false; Runnable hellos = () -> { for (int i = 1; i <= 1000; i++) System.out.println("Hello " + i); done = true; }; Runnable goodbye = () -> { int i = 1; while (!done) i++; System.out.println("Goodbye " + i); };
done = true;
in one thread is not visible to the other thread!while (!done) i++;can be reordered as
if (!done) while (true) i++;
final
variable is visible after initialization.static
variable is visible after initialization.volatile
variable are visible.private static volatile boolean done;
final
modifier is your friend. Use it when appropriate for all fields.private static volatile int count = 0; ... count++; // Task 1 ... count++; // Task 2 ...
count++
is not atomic.register = count; Increment register count = register;
register1 = count; Increment register1 register2 = count; Increment register2 count = register2; count = register1;
Node n = new Node(); if (head == null) head = n; else tail.next = n; tail = n; tail.value = newValue;
String
, java.time.ZonedDateTime
HashSet
for aggregating results is dangerous:
results.addAll(newResult); // What if another thread accesses results
?
addAll
operation is complex and must be atomic.results = results.union(newResult);
final
.
final
.this
escape in a constructor.
this
to another method during construction.this
be captured in an inner class during construction.long result = coll.parallelStream() .filter(s -> s.startsWith("A")).count();
urls.parallelStream().map(url -> Read contents of url)... // Problematic——might starve global fork-join pool.
Arrays
class has several methods that are parallelized for you.Arrays.parallelSort(words, Comparator.comparing(String::length));
Arrays.parallelSetAll(values, i -> i % 10);
long sum = IntStream.of(values).parallel().sum();
void time(Runnable task) { long start = System.nanoTime(); task.run(); long end = System.nanoTime(); System.out.println((end - start) * 1E-9 + " seconds"); } int SIZE = 100_000_000; int[] values = new int[SIZE]; Arrays.parallelSetAll(values, i -> (int) (SIZE * Math.sin(i))); time(() -> Arrays.parallelSort(values)); Arrays.parallelSetAll(values, i -> (int) (SIZE * Math.sin(i))); time(() -> Arrays.sort(values));
java.util.concurrent
package supplies ConcurrentHashMap
and other concurrent data structures.ConcurrentModificationException
ConcurrentHashMap
won't be damaged by concurrent mutations.Long oldValue = map.get(word);
Long newValue = oldValue == null ? 1 : oldValue + 1;
map.put(word, newValue); // Error—might not replace oldValue
map.compute(word, (k, v) -> v == null ? 1 : v + 1);
computeIfPresent
, computeIfAbsent
putIfAbsent
to atomically initialize a value:
map.putIfAbsent(word, 0L);
merge
method lets you supply an initial key and an update operation:
map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue);
// Or simply map.merge(word, 1L, Long::sum);
forEach
, reduce
, search
, replaceAll
ArrayBlockingQueue
and LinkedBlockingQueue
are threadsafe implementations of bounded queues.add
/remove
methods throw an IllegalStateException
when queue is full/empty.put
/take
methods block when queue is full/empty.offer
/poll
methods return false
/null
when queue is full/empty.
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
long
and boolean
values, object references, and arrays of these values.public static AtomicLong nextNumber = new AtomicLong(); // In some thread . . . long id = nextNumber.incrementAndGet();
public static AtomicLong sum = new AtomicLong(); // In some thread . . . sum.addAndGet(partialSum);
public static AtomicLong largest = new AtomicLong(); // In some thread . . . largest.set(Math.max(largest.get(), observed)); // Error—race condition!
updateAndGet
or accumulateAndGet
:
largest.updateAndGet(x -> Math.max(x, observed)); largest.accumulateAndGet(observed, Math::max);
AtomicLong
can be inefficient under high contention.
LongAccumulator
class solves this problem by splitting a counter or accumulator into multiple variables:
LongAccumulator accumulator = new LongAccumulator(Long::max, 0); // In some tasks . . . accumulator.accumulate(value); // When all work is done long largest = accumulator.get();
LongAdder
if you just need a counter.lock
before entering the critical section and unlock
when leaving it:
Lock countLock = new ReentrantLock(); // Shared among multiple threads int count; // Shared among multiple threads ... countLock.lock(); try { count++; // Critical section } finally { countLock.unlock(); // Make sure the lock is unlocked }
ReentrantLock
, you can use intrinsic locks.synchronized
statement locks and unlocks the intrinsic lock:
synchronized(obj) { Critical section }
obj.intrinsicLock.lock(); try { Critical section } finally { obj.intrinsicLock.unlock(); }
synchronized
locks the body on this
:
public synchronized void method() { body }
public void method() { synchronized(this) { body } }
Counter
class:
public class Counter { private int value; public synchronized int increment() { value++; return value; } }
synchronized (table) { for (K key : table.keySet()) ... }
synchronized ("LOCK") { for (K key : table.keySet()) ... }
public synchronized Object take() { // From a blocking queue if (head == null) ...; // Can't proceed Node n = head; head = n.next; return n.value; }
wait
makes thread relinquish lock and enter a “waiting” state:
public synchronized Object take() { // From a blocking queue while (head == null) wait(); ... }
notifyAll
to allow waiting threads to continue:
public synchronized void add(Object newValue) { ... notifyAll(); }
Runnable task = () -> { ... }; Thread thread = new Thread(task); thread.start();
Thread.sleep(milliseconds);
run
method ends (normally or due to an exception).thread.join(milliseconds);
thread.interrupt();
while (more work to do) { if (Thread.currentThread().isInterrupted()) return; Do more work }
InterruptedException
InterruptedException
is thrown.try { while (more work to do) { Do more work Thread.sleep(millis); } } catch (InterruptedException ex) { // Do nothing }
isInterrupted
.
sleep
throws an InterruptedException
when it is called from an interrupted thread.NumberFormat
is not threadsafe and should not be globally shared.NumberFormat
instance for each thread.public static final ThreadLocal<NumberFormat> currencyFormat = ThreadLocal.withInitial(() -> NumberFormat.getCurrencyInstance());
String amountDue = currencyFormat.get().format(total);
get
in a given thread, the lambda in withInitial
is called.System.err
thread.setDaemon(true)
before starting the thread.JButton read = new JButton("Read"); read.addActionListener(event -> Read document from url);
read.addActionListener(event -> Start new task that reads document from url);
EventQueue.invokeLater(() -> Update UI for progress or completion display);
SwingWorker
/AsyncTask
to manage event flow.CompletableFuture
class provides an alternative.CompletableFuture<T>
is a Future<T>
which will deliver a value at some point in the future.CompletableFuture<T>
implements the CompletionStage<T>
interface, which allows completable futures to be composed.CompletableFuture
:
public CompletableFuture<String> readPage(URL url) public List<URL> getImageURLs(String webpage) // Not time-consuming public CompletableFuture<List<BufferedImage>> getImages(List<URL> urls) public void saveImages(List<BufferedImage> images)
CompletableFuture.completedFuture(urlToProcess) .thenComposeAsync(this::readPage, executor) .thenApply(this::getImageURLs) .thenCompose(this::getImages) .thenAccept(this::saveImages);
CompletionException
that wraps the original exception.exceptionally
method allows you to substitute a value for an exception:
CompletableFuture.completedFuture(urlToProcess) .thenComposeAsync(this::readPage, executor) .exceptionally(ex -> "<html></html>") .thenApply(this::getImageURLs)
handle
with a BiFunction<T, Throwable, U>
that receives either the result or the exception (and null
for the other).CompletableFuture<T> future1 = ...; CompletableFuture<U> future2 = ...; CompletableFuture<V> combined = future1.thenCombine(future2, combiner);
combiner
is a function taking arguments of type T
and U
, and producing a result of type V
.CompletableFuture.allOf
method takes an array or varargs sequence of completable futures and waits for all of them to complete, but it doesn't combine the results.applyToEither
method:
CompletableFuture<T> future1 = ...; CompletableFuture<T> future2 = ...; CompletableFuture<U> combined = future1.applyToEither(future2, f); //f
mapsT
toU
CompletableFuture.completedFuture(urlToProcess) .thenComposeAsync(this::readPage, executor) .completeOnTimeout("<html></html>", 30, TimeUnit.SECONDS) .thenApply(this::getImageURLs) ...
CompletableFuture.completedFuture(urlToProcess) .thenComposeAsync(this::readPage, executor) .orTimeout(30, TimeUnit.SECONDS) .thenApply(this::getImageURLs) ...
ProcessBuilder
to construct a Process
object.Process
with the ProcessBuilder
ClassProcessBuilder builder = new ProcessBuilder("gcc", "myapp.c");
// You can also provide a List<String>
builder = builder.directory(path.toFile());
Map<String, String> env = builder.environment(); env.put("LANG", "fr_FR"); env.remove("JAVA_HOME");
builder.redirectIO()
builder.redirectInput(inputFile) .redirectOutput(outputFile) .redirectError(errorFile)
builder.redirectErrorStream(true)
Process p = builder.start();
OutputStream processIn = p.getOutputStream(); InputStream processOut = p.getInputStream(); InputStream processErr = p.getErrorStream();
processIn
, and read the output and errors from processOut
and processErr
.processIn
is an OutputStream
since your Java program writes to it and the process reads what your program wrote. int result = p.waitFor() int result = p.waitFor(delay, TimeUnit.SECONDS);
p.destroy(); // SIGTERM in Unix p.destroyForcibly(); // SIGKILL in Unix
import java.io.*; ProcessBuilder builder = new ProcessBuilder("ls", "-al"); Process p = builder.directory(new File("/home/cay")).start(); InputStream processOut = p.getInputStream(); Scanner in = new Scanner(processOut); while (in.hasNextLine()) System.out.println(in.nextLine()); p.waitFor()
onExit
method yields a CompletableFuture<ProcessHandle>
that completes when the process is completed:
Process process = new ProcessBuilder("/bin/ls").start(); process.onExit().thenAccept(h -> System.out.println(h + " all done"));
ProcessHandle
by calling:
proc.toHandle()
from a Process process
ProcessHandle.of(pid)
with an OS PIDProcessHandle.current()
ProcessHandle.allProcesses()
long pid = handle.pid(); Optional<ProcessHandle> parent = handle.parent(); Stream<ProcessHandle> children = handle.children(); Stream<ProcessHandle> descendants = handle.descendants(); ProcessHandle.Info info = handle.info();
ProcessHandle.info
yields command/commandLine/arguments/startInstant/totalCpuDuration/user
if available.