Chapter 13 Concurrency Flashcards

1
Q

Introducing Threads

A
  • A thread is the smallest unit of execution that can be scheduled by the operating system.
  • A process is a group of associated threads that execute in the same shared environment.
  • It follows, then, that a single-threaded process is one that contains exactly one thread,
  • whereas a multithreaded process supports more than one thread.
  • By shared environment, we mean that the threads in the same process share the same memory space and can communicate directly with one another.
  • A task is a single unit of work performed by a thread.
  • A thread can complete multiple independent tasks but only one task at a time.
  • By shared memory, we are generally referring to static variables as well as instance and local variables passed to a thread.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

thread

A

A thread is the smallest unit of execution that can be scheduled by the operating system.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

process

A

A process is a group of associated threads that execute in the same shared environment.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

task

A

A task is a single unit of work performed by a thread.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

shared environment

A

shared environment, we mean that the threads in the same process share the same memory space and can communicate directly with one another.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

shared memory

A

shared memory, we are generally referring to static variables as well as instance and local variables passed to a thread.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Understanding Thread Concurrency

A
  • The property of executing multiple threads and processes at the same time is referred to as concurrency.
  • Operating systems use a thread scheduler to determine which threads should be currently executing
  • A context switch is the process of storing a thread’s current state and later restoring the state of the thread to continue execution.
  • Finally, a thread can interrupt or supersede another thread if it has a higher thread priority than the other thread. A thread priority is a numeric value associated with a thread that is taken into consideration by the thread scheduler when determining which threads should currently be executing. In Java, thread priorities are specified as integer values.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

How does the system decide what to execute when there are more threads available than CPUs?

A

Operating systems use a thread scheduler to determine which threads should be currently executing, as shown in Figure 13.1. For example, a thread scheduler may employ a round-robin schedule in which each available thread receives an equal number of CPU cycles with which to execute, with threads visited in a circular order.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

context switch

A

A context switch is the process of storing a thread’s current state and later restoring the state of the thread to continue execution.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

thread priority

A

A thread priority is a numeric value associated with a thread that is taken into consideration by the thread scheduler when determining which threads should currently be executing. In Java, thread priorities are specified as integer values.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Creating a Thread

A

**** One of the most common ways to define a task for a thread is by using the *Runnable instance.
* Runnable is a functional interface that takes no arguments and returns no data.

@FunctionalInterface public interface Runnable {
void run();
}

With this, it’s easy to create and start a thread.

new Thread(() -> System.out.print("Hello")).start();
System.out.print("World");

Remember that order of thread execution is not often guaranteed. The exam commonly presents questions in which multiple tasks are started at the same time, and you must determine the result.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Calling run() Instead of start()

System.out.println("begin");
new Thread(printInventory).run();
new Thread(printRecords).run();
new Thread(printInventory).run();
System.out.println("end");
A

Calling run() on a Thread or a Runnable does not start a new thread.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

we can create a Thread and its associated task one of two ways in Java:

A

More generally, we can create a Thread and its associated task one of two ways in Java:

  • Provide a Runnable object or lambda expression to the Thread constructor.
  • Create a class that extends Thread and overrides the run() method.

Creating a class that extends Thread is relatively uncommon and should only be done under certain circumstances, such as if you need to overwrite other thread methods.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Distinguishing Thread Types

A
  • A system thread is created by the Java Virtual Machine (JVM) and runs in the background of the application.
    • For example, garbage collection is managed by a system thread created by the JVM.
  • Alternatively, a user-defined thread is one created by the application developer to accomplish a specific task.
  • System and user-defined threads can both be created as daemon threads.
    • A daemon thread is one that will not prevent the JVM from exiting when the program finishes.
    • A Java application terminates when the only threads that are running are daemon threads.
    • For example, if garbage collection is the only thread left running, the JVM will automatically shut down.

by default, user-defined threads are not daemons, and the program will wait for them to finish.

set thread as daemon, before start()

job.setDaemon(true);
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Managing a Thread’s Life Cycle

A

You can query a thread’s state by calling getState() on the thread object.

  1. Every thread is initialized with a NEW state.
  2. As soon as start() is called, the thread is moved to a RUNNABLE state.
    • Does that mean it is actually running?
      Not exactly: it may be running, or it may not be.
    • The RUNNABLE state just means the thread is able to be run.
  3. Once the work for the thread is completed (run() completes) or an uncaught exception is thrown, the thread state becomes TERMINATED, and no more work is performed.
  4. While in a RUNNABLE state, the thread may transition to one of three states where it pauses its work:
    • BLOCKED,
    • WAITING,
    • or TIMED_WAITING.

This figure includes common transitions between thread states, but there are other possibilities. For example, a thread in a WAITING state might be triggered by notifyAll().

Likewise, a thread that is interrupted by another thread will exit TIMED_WAITING and go straight back into RUNNABLE.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Managing a Thread’s Life Cycle

A

You can query a thread’s state by calling getState() on the thread object.

  1. Every thread is initialized with a NEW state.
  2. As soon as start() is called, the thread is moved to a RUNNABLE state. It may be running, or it may not be. The RUNNABLE state just means the thread is able to be run.
  3. Once the work for the thread is completed (run() completes) or an uncaught exception is thrown, the thread state becomes TERMINATED, and no more work is performed.
  4. While in a RUNNABLE state, the thread may transition to one of three states where it pauses its work:
    • BLOCKED, (Waiting to enter synchronized block)
    • WAITING, (Waiting indefinitely to be notified)
    • or TIMED_WAITING. (Waiting a specified time)

A thread that is interrupted by another thread will exit TIMED_WAITING and go straight back into RUNNABLE.

FIGURE 13.2 Thread states

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
17
Q

Managing a Thread’s Life Cycle

A
  • NEW (thread created but no started )
  • RUNNABLE (running or able to be run)
    • BLOCKED (waiting to enter synchronized block)
    • WAITING (waiting indefinitely to be notified)
    • TIMED_WAITING (waiting a specified time)
  • TERMINATED (task complete)

create thread -> NEW - start() -> RUNNABLE - run() completes-> TERMINATED

RUNNABLE -resource requested -> BLOCKED
RUNNABLE <-resource granted- BLOCKED

RUNNABLE -wait() -> WAITING
RUNNABLE <-notify() - WAITING

RUNNABLE -sleep() -> TIMED_WAITING
RUNNABLE <-time elapsed - TIMED_WAITING

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
18
Q

Polling with Sleep

A

Even though multithreaded programming allows you to execute multiple tasks at the same time, one thread often needs to wait for the results of another thread to proceed. One solution is to use polling. Polling is the process of intermittently checking data at some fixed interval.

We can improve this result by using the Thread.sleep() method to implement polling and sleep for 1,000 milliseconds, aka 1 second:

public class CheckResultsWithSleep {
    private static int counter = 0;
    public static void main(String[] a) {
        new Thread(() -> {
            for(int i = 0; i < 1_000_000; i++) counter++;
        }).start();
        while(counter < 1_000_000) {
            System.out.println("Not reached yet");
            try {
                Thread.sleep(1_000); // 1 SECOND
            } catch (InterruptedException e) {
                System.out.println("Interrupted!");
            }
        }
        System.out.println("Reached: "+counter);
} }
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
19
Q

Polling

A

Polling is the process of intermittently checking data at some fixed interval.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
20
Q

Interrupting a Thread

A

One way to improve this program is to allow the thread to interrupt the main() thread when it’s done:

public class CheckResultsWithSleepAndInterrupt {
    private static int counter = 0;
    public static void main(String[] a) {
        final var mainThread = Thread.currentThread();
        new Thread(() -> {
            for(int i = 0; i < 1_000_000; i++) counter++;
            mainThread.interrupt();
        }).start();
        while(counter < 1_000_000) {
            System.out.println("Not reached yet");
            try {
                Thread.sleep(1_000); // 1 SECOND
            } catch (InterruptedException e) {
                System.out.println("Interrupted!");
            }
        }
        System.out.println("Reached: "+counter);
} }

final var mainThread = Thread.currentThread();

mainThread.interrupt();

Calling interrupt() on a thread in the TIMED_WAITING or WAITING state causes the main() thread to become RUNNABLE again, triggering an InterruptedException. The thread may also move to a BLOCKED state if it needs to reacquire resources when it wakes up.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
21
Q

> [!NOTE]
Calling interrupt() on a thread already in a RUNNABLE state doesn’t change the state.
In fact, it only changes the behavior if the thread is periodically checking the Thread.isInterrupted() value state.

A

interrupt()
Thread.isInterrupted()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
22
Q

Creating Threads with the Concurrency API

A
  • java.util.concurrent
  • The Concurrency API includes the ExecutorService interface, which defines services that create and manage threads.
    • You first obtain an instance of an ExecutorService interface,
    • and then you send the service tasks to be processed.
    • The framework includes numerous useful features, such as thread pooling and scheduling.
  • It is recommended that you use this framework any time you need to create and execute a separate task, even if you need only a single thread.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
23
Q

Introducing the Single-Thread Executor

ExecutorService service = Executors.newSingleThreadExecutor();
try {
    System.out.println("begin");
    service.execute(printInventory);
    service.execute(printRecords);
    service.execute(printInventory);
    System.out.println("end");
} finally {
    service.shutdown();
}
A

Possible output:

begin
Printing zoo inventory
Printing record: 0
Printing record: 1
end
Printing record: 2
Printing zoo inventory
  • The Concurrency API includes the Executors factory class that can be used to create instances of the ExecutorService object.
  • we use the newSingleThreadExecutor() method to create the service.
  • With a single-thread executor, tasks are guaranteed to be executed sequentially.
  • Notice that the end text is output while our thread executor tasks are still running.
  • This is because the main() method is still an independent thread from the ExecutorService.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
24
Q

Shutting Down a Thread Executor

A
  • Once you have finished using a thread executor, it is important that you call the shutdown() method.
  • A thread executor creates a non-daemon thread on the first task that is executed, so failing to call shutdown() will result in your application never terminating.
  • The shutdown process for a thread executor involves
    • first rejecting any new tasks submitted to the thread executor while continuing to execute any previously submitted tasks.
    • During this time, calling isShutdown() will return true, while isTerminated() will return false.
    • If a new task is submitted to the thread executor while it is shutting down, a RejectedExecutionException will be thrown.
    • Once all active tasks have been completed, isShutdown() and isTerminated() will both return true. Figure 13.3 shows the life cycle of an ExecutorService object.

For the exam, you should be aware that shutdown() does not stop any tasks that have already been submitted to the thread executor.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
25
Q

What if you want to cancel all running and upcoming tasks?

A

The ExecutorService provides a method called shutdownNow(), which attempts to stop all running tasks and discards any that have not been started yet. It is not guaranteed to succeed because it is possible to create a thread that will never terminate, so any attempt to interrupt it may be ignored.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
26
Q

Submitting Tasks

A

You can submit tasks to an ExecutorService instance multiple ways.
1. The first method we presented, execute(), is inherited from the Executor interface, which the ExecutorService interface extends. The execute() method takes a Runnable instance and completes the task asynchronously. Because the return type of the method is void, it does not tell us anything about the result of the task. It is considered a “fire-and-forget” method, as once it is submitted, the results are not directly available to the calling thread.
2. Fortunately, the writers of Java added submit() methods to the ExecutorService interface, which, like execute(), can be used to complete tasks asynchronously. Unlike execute(), though, submit() returns a Future instance that can be used to determine whether the task is complete. It can also be used to return a generic result object after the task has been completed.

  1. void execute(Runnable command) Executes Runnable task at some point in future.
  2. Future<?> submit(Runnable task)
    Executes Runnable task at some point in future and returns Future representing task.
  3. <T> Future<T> submit(Callable<T> task)
    Executes Callable task at some point in future and returns Future representing pending results of task.
  4. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    Executes given tasks and waits for all tasks to complete. Returns List of Future instances in same order in which they were in original collection.
  5. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    Executes given tasks and waits for at least one to complete.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
27
Q

Submitting Tasks: execute() vs. submit()

A

The submit() method have a return object that can be used to track the result.

void execute(Runnable command)
Executes the given command at some time in the future.

Future<?> submit(Runnable task)
Submits a Runnable task for execution and returns a Future representing that task.
The Future’s get method will return null upon successful completion.

<T> Future<T> submit(Runnable task, T result)
Submits a Runnable task for execution and returns a Future representing that task.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
28
Q

How do we know when a task submitted to an ExecutorService is complete?

A

Future<V> instance that can be used to determine this result.

Future<?> future = service.submit(() -> System.out.println("Hello"));

The Future type is actually an interface.

  1. boolean isDone()
    Returns true if task was completed, threw exception, or was
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
29
Q

Future methods

A
  1. boolean isDone()
    Returns true if task was completed, threw exception, or was cancelled.
  2. boolean isCancelled()
    Returns true if task was cancelled before it completed normally.
  3. boolean cancel(boolean mayInterruptIfRunning)
    Attempts to cancel execution of task and returns true if it was successfully cancelled or false if it could not be cancelled or is complete.
  4. V get()
    Retrieves result of task, waiting endlessly if it is not yet available.
  5. V get(long timeout, TimeUnit unit)
    Retrieves result of task, waiting specified amount of time. If result is not ready by time timeout is reached, checked TimeoutException will be thrown.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
30
Q
import java.util.concurrent.*;
public class CheckResults {
    private static int counter = 0;
    public static void main(String[] unused) throws Exception {
        ExecutorService service = Executors.newSingleThreadExecutor();
        try {
            Future<?> result = service.submit(() -> {
                for(int i = 0; i < 1_000_000; i++) counter++;
            });
            result.get(10, TimeUnit.SECONDS); // Returns null for Runnable
            System.out.println("Reached!");
        } catch (TimeoutException e) {
            System.out.println("Not reached in time");
        } finally {
            service.shutdown();
} } }
A
  • The Executors class provides factory methods for the executor services provided in this package.
  • ExecutorService service = Executors.newSingleThreadExecutor();
    Creates an Executor that uses a single worker thread operating off an unbounded queue.
  • Future<?> result = service.submit(() -> { for(int i = 0; i < 1000000; i++) counter++;});
    Since the return type of Runnable.run() is void, the get() method always returns null when working with Runnable expressions.
  • result.get(10, TimeUnit.SECONDS); // Returns null for Runnable
    It also waits at most 10 seconds, throwing a TimeoutException on the call to result.get() if the task is not done.
  • <T> Future<T> submit(Runnable task, T result)
    Submits a Runnable task for execution and returns a Future representing that task. The Future’s get method will return the given result upon successful completion.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
31
Q

java.util.concurrent.TimeUnit
Concurrency API use this enum

A

TimeUnit enum

  • TimeUnit.NANOSECONDS
    Time in one-billionths of a second (1/1,000,000,000)
  • TimeUnit.MICROSECONDS
    Time in one-millionths of a second (1/1,000,000)
  • TimeUnit.MILLISECONDS
    Time in one-thousandths of a second (1/1,000)
  • TimeUnit.SECONDS
    Time in seconds
  • TimeUnit.MINUTES
    Time in minutes
  • TimeUnit.HOURS
    Time in hours
  • TimeUnit.DAYS
    Time in days
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
32
Q

Introducing Callable

A
  • java.util.concurrent.Callable functional interface
  • call() method returns a value and can throw a checked exception.
    ~~~
    @FunctionalInterface public interface Callable<V> {
    V call() throws Exception;
    }
    ~~~</V>
  • The Callable interface is often **preferable over Runnable, **since it allows more details to be retrieved easily from the task after it is completed.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
33
Q

Example using Callable:

var service = Executors.newSingleThreadExecutor();
try {
    Future<Integer> result = service.submit(() -> 30 + 11);
    System.out.println(result.get()); // 41
} finally {
    service.shutdown();
}
A

Callable

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
34
Q

Waiting for All Tasks to Finish

A

If we don’t need the results of the tasks and are finished using our thread executor, there is a simpler approach.

  1. First, we shut down the thread executor using the shutdown() method.
  2. Next, we use the awaitTermination() method available for all thread executors. The method waits the specified time to complete all tasks, returning sooner if all tasks finish or an InterruptedException is detected.
  3. call isTerminated() after the awaitTermination() method finishes to confirm that all tasks are finished.
ExecutorService service = Executors.newSingleThreadExecutor();
try {
// Add tasks to the thread executor
    …
} finally {
    service.shutdown();
}
service.awaitTermination(1, TimeUnit.MINUTES);
// Check whether all tasks are finished
if(service.isTerminated()) System.out.println("Finished!");
else System.out.println("At least one task is still running");
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
35
Q

Scheduling Tasks

A

Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
36
Q

ScheduledExecutorService methods

A
  • schedule(Callable<V> callable, long delay, TimeUnit unit)
    Creates and executes Callable task after given delay
  • schedule(Runnable command, long delay, TimeUnit unit)
    Creates and executes Runnable task after given delay
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    Creates and executes Runnable task after given initial delay, creating new task every period value that passes
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
    Creates and executes Runnable task after given initial delay and subsequently with given delay between termination of one execution and commencement of next
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
37
Q

scheduleWithFixedDelay()

A

scheduleWithFixedDelay() method creates a new task only after the previous task has finished.

For example, if a task runs at 12:00 and takes five minutes to finish, with a period between executions of two minutes, the next task will start at 12:07.

service.scheduleWithFixedDelay(task1, 0, 2, TimeUnit.MINUTES);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
38
Q

Increasing Concurrency with Pools

A

A thread pool is a group of pre-instantiated reusable threads that are available to perform a set of arbitrary tasks.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
39
Q

Executors factory methods

A
  • ExecutorService newSingleThreadExecutor()
    Creates single-threaded executor that uses single worker thread operating off unbounded queue. Results are processed sequentially in order in which they are submitted.
  • ScheduledExecutorService newSingleThreadScheduledExecutor()
    Creates single-threaded executor that can schedule commands to run after given delay or to execute periodically.
  • ExecutorService newCachedThreadPool()
    Creates thread pool that creates new threads as needed but reuses previously constructed threads when they are available.
  • ExecutorService newFixedThreadPool(int)
    Creates thread pool that reuses fixed number of threads operating off shared unbounded queue.
  • ScheduledExecutorService newScheduledThreadPool(int)
    Creates thread pool that can schedule commands to run after given delay or execute periodically.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
40
Q

Writing Thread-Safe Code

A
  • Thread-safety is the property of an object that guarantees safe execution by multiple threads at the same time.
  • Since threads run in a shared environment and memory space, how do we prevent two threads from interfering with each other?
    We must organize access to data so that we don’t end up with invalid or unexpected results.
  • how to use a variety of techniques to protect data, including atomic classes, synchronized blocks, the Lock framework, and cyclic barriers.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
41
Q

Understanding Thread-Safety

A
1: import java.util.concurrent.*;
2: public class SheepManager {
3:     private int sheepCount = 0;
4:     private void incrementAndReport() {
5:         System.out.print((++sheepCount)+" ");
6:     }
7:     public static void main(String[] args) {
8:         ExecutorService service = Executors.newFixedThreadPool(20);
9:         try {
10:             SheepManager manager = new SheepManager();
11:             for(int i = 0; i < 10; i++)
12:                 service.submit(() -> manager.incrementAndReport());
13:         } finally {
14:             service.shutdown();
15: } } }

It may output in a different order. Worse yet, it may print some numbers twice and not print some numbers at all! The following are possible outputs of this program:

1 2 3 4 5 6 7 8 9 10
1 9 8 7 3 6 6 2 4 5
1 8 7 3 2 6 5 4 2 9

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
42
Q

race condition

A

the unexpected result of two tasks executing at the same time is referred to as a race condition.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
43
Q

Accessing Data with volatile

A

The volatile keyword is used to guarantee that access to data within memory is consistent.

The volatile attribute ensures that only one thread is modifying a variable at one time and that data read among multiple threads is consistent.

3: private volatile int sheepCount = 0;
4: private void incrementAndReport() {
5:     System.out.print((++sheepCount)+" ");
6: }

Unfortunately, this code is not thread-safe and could still result in numbers being missed:

2 6 1 7 5 3 2 9 4 8

The reason this code is not thread-safe is that ++sheepCount is still two distinct operations.
Put another way, if the increment operator represents the expression sheepCount = sheepCount + 1, then each read and write operation is thread-safe, but the combined operation is not. Referring back to our sheep example, we don’t interrupt the employee while running, but we could still have multiple people in the field at the same time.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
44
Q

> [!NOTE]
In practice, volatile is rarely used.
We only cover it because it has been known to show up on the exam from time to time.

A

volatile

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
45
Q

Protecting Data with Atomic Classes

A

Atomic is the property of an operation to be carried out as a single unit of execution without any interference from another thread.

A thread-safe atomic version of the increment operator would perform the read and write of the variable as a single operation, not allowing any other threads to access the variable during the operation.

Figure 13.5 shows the result of making the sheepCount variable atomic.

46
Q

Atomic classes

A
  • AtomicBoolean
    A boolean value that may be updated atomically
    AtomicInteger
    An int value that may be updated atomically
    AtomicLong
    A long value that may be updated atomically
47
Q
3:     private AtomicInteger sheepCount = new AtomicInteger(0);
4:     private void incrementAndReport() {
5:        System.out.print(sheepCount.incrementAndGet()+" ");
6:     }
A
2 3 1 4 5 6 7 8 9 10
1 4 3 2 5 6 7 8 9 10
1 4 3 5 6 2 7 8 10 9
48
Q

Common atomic methods

A
  • get()
    Retrieves current value
  • set(type newValue)
    Sets given value, equivalent to assignment = operator
  • getAndSet(type newValue)
    Atomically sets new value and returns old value
  • incrementAndGet()
    For numeric classes, atomic pre-increment operation equivalent to ++value
  • getAndIncrement()
    For numeric classes, atomic post-increment operation equivalent to value++
  • decrementAndGet()
    For numeric classes, atomic pre-decrement operation equivalent to –value
  • getAndDecrement()
    For numeric classes, atomic post-decrement operation equivalent to value–
49
Q

Improving Access with synchronized Blocks

A

The most common technique is to use a monitor to synchronize access. A monitor, also called a lock, is a structure that supports mutual exclusion, which is the property that at most one thread is executing a particular segment of code at a given time.

In Java, any Object can be used as a monitor, along with the synchronized keyword, as shown in the following example:

var manager = new SheepManager();
synchronized(manager) {
   // Work to be completed by one thread at a time
}

This example is referred to as a synchronized block.

  • Each thread that arrives will first check if any threads are already running the block.
  • If the lock is not available, the thread will transition to a BLOCKED state until it can “acquire the lock.”
  • If the lock is available (or the thread already holds the lock), the single thread will enter the block, preventing all other threads from entering.
  • Once the thread finishes executing the block, it will release the lock, allowing one of the waiting threads to proceed.
50
Q

synchronized block

A

synchronized block.

  • Each thread that arrives will first check if any threads are already running the block.
  • If the lock is not available, the thread will transition to a BLOCKED state until it can “acquire the lock.”
  • If the lock is available (or the thread already holds the lock), the single thread will enter the block, preventing all other threads from entering.
  • Once the thread finishes executing the block, it will release the lock, allowing one of the waiting threads to proceed.
51
Q

> [!NOTE]
To synchronize access across multiple threads, each thread must have access to the same Object.
If each thread synchronizes on different objects, the code is not thread-safe.

A

synchronize

52
Q

Does this solution fix the problem?

11: for(int i = 0; i < 10; i++) {
12:    synchronized(manager) {
13:       service.submit(() -> manager.incrementAndReport());
14:    }
15: }
A

No, it does not! Can you spot the problem? We’ve synchronized the creation of the threads but not the execution of the threads. In this example, the threads would be created one at a time, but they might all still execute and perform their work simultaneously, resulting in the same type of output that you saw earlier. We did say diagnosing and resolving thread problems is difficult in practice!

53
Q
5:        synchronized(this) {
6:           System.out.print((++sheepCount)+" ");
7:        }
A

synchronized

54
Q

We could have synchronized on any object, as long as it was the same object. For example, the following code snippet would also work:

4:     private final Object herd = new Object();
5:     private void incrementAndReport() {
6:        synchronized(herd) {
7:           System.out.print((++sheepCount)+" ");
8:        }
9:     }

Although we didn’t need to make the herd variable final, doing so ensures that it is not reassigned after threads start using it.

A

When this code executes, it will consistently output the following:
1 2 3 4 5 6 7 8 9 10

Although all threads are still created and executed at the same time, they each wait at the synchronized block for the worker to increment and report the result before entering.

55
Q

Synchronizing on Methods

A

We can add the synchronized modifier to any instance method to synchronize automatically on the object itself. For example, the following two method definitions are equivalent:

void sing() {
   synchronized(this) {
      System.out.print("La la la!");
   }
}
synchronized void sing() {
   System.out.print("La la la!");
}

The first uses a synchronized block, whereas the second uses the synchronized method modifier. Which you use is completely up to you.

56
Q

We can also apply the synchronized modifier to static methods. What object is used as the monitor when we synchronize on a static method?

A

The class object, of course! For example, the following two methods are equivalent for static synchronization inside our SheepManager class:

static void dance() {
   synchronized(SheepManager.class) {
      System.out.print("Time to dance!");
   }
}
static synchronized void dance() {
   System.out.print("Time to dance!");
}

As before, the first uses a synchronized block, with the second example using the synchronized modifier. You can use static synchronization if you need to order thread access across all instances rather than a single instance.

57
Q

Understanding the Lock Framework

A

A synchronized block supports only a limited set of functionality.
For example, what if we want to check whether a lock is available and, if it is not, perform some other task? Furthermore, if the lock is never available and we synchronize on it, we might wait forever.

The Concurrency API includes the Lock interface, which is conceptually similar to using the synchronized keyword but with a lot more bells and whistles. Instead of synchronizing on any Object, though, we can “lock” only on an object that implements the Lock interface.

58
Q

Applying a ReentrantLock

A

The Lock interface is pretty easy to use.

When you need to protect a piece of code from multithreaded processing, create an instance of Lock that all threads have access to.

Each thread then calls lock() before it enters the protected code and calls unlock() before it exits the protected code.

// Implementation #1 with a synchronized block
Object object = new Object();
synchronized(object) {
   // Protected code
}
 
// Implementation #2 with a Lock
Lock lock = new ReentrantLock();
try {
   lock.lock();
   // Protected code
} finally {
   lock.unlock();
}

The ReentrantLock class is a simple monitor that implements the Lock interface and supports mutual exclusion. In other words, at most one thread is allowed to hold a lock at any given time.

The ReentrantLock class ensures that once a thread has called lock() and obtained the lock, all other threads that call lock() will wait until the first thread calls unlock(). Which thread gets the lock next depends on the parameters used to create the Lock object.

The ReentrantLock class includes a constructor that takes a single boolean and sets a “fairness” parameter. If the parameter is set to true, the lock will usually be granted to each thread in the order in which it was requested. It is false by default when using the no-argument constructor. In practice, you should enable fairness only when ordering is absolutely required, as it could lead to a significant slowdown.

Besides always making sure to release a lock, you also need to be sure that you only release a lock that you have. If you attempt to release a lock that you do not have, you will get an exception at runtime.

Lock lock = new ReentrantLock();
lock.unlock(); // IllegalMonitorStateException
59
Q

> [!TIPS]
While certainly not required, it is a good practice to use a try/finally block with Lock instances.
Doing so ensures that any acquired locks are properly released.

A

lock

60
Q

Attempting to Acquire a Lock

A

While the ReentrantLock class allows you to wait for a lock, it so far suffers from the same problem as a synchronized block. A thread could end up waiting forever to obtain a lock. Luckily, Table 13.8 includes two additional methods that make the Lock interface a lot safer to use than a synchronized block.

  • void lock()
    Requests lock and blocks until lock is acquired.
  • void unlock()
    Releases lock.
  • boolean tryLock()
    Requests lock and returns immediately. Returns boolean indicating whether lock was successfully acquired.
  • boolean tryLock(long timeout, TimeUnit unit)
    Requests lock and blocks for specified time or until lock is acquired. Returns boolean indicating whether lock was successfully acquired.
61
Q

tryLock()

A

The tryLock() method will attempt to acquire a lock and immediately return a boolean result indicating whether the lock was obtained.

Unlike the lock() method, it does not wait if another thread already holds the lock. It returns immediately, regardless of whether a lock is available.
The following is a sample implementation using the tryLock() method:

Lock lock = new ReentrantLock();
new Thread(() -> printHello(lock)).start();
if(lock.tryLock()) {
   try {
      System.out.println("Lock obtained, entering protected code");
   } finally {
      lock.unlock();
   }
} else {
   System.out.println("Unable to acquire lock, doing something else");
}

Like lock(), the tryLock() method should be used with a try/finally block. Fortunately, you need to release the lock only if it was successfully acquired. For this reason, it is common to use the output of tryLock() in an if statement, so that unlock() is called only when the lock is obtained.

62
Q

> [!TIPS]
It is imperative that your program always check the return value of the tryLock() method.
It tells your program whether it is safe to proceed with the operation and whether the lock needs to be released later.

A

tryLock()

63
Q

tryLock(long,TimeUnit)

A

The Lock interface includes an overloaded version of tryLock(long,TimeUnit) that acts like a hybrid of lock() and tryLock(). Like the other two methods, if a lock is available, it will immediately return with it. If a lock is unavailable, though, it will wait up to the specified time limit for the lock.

The following code snippet uses the overloaded version of tryLock(long,TimeUnit):

Lock lock = new ReentrantLock();
new Thread(() -> printHello(lock)).start();
if(lock.tryLock(10,TimeUnit.SECONDS)) {
   try {
      System.out.println("Lock obtained, entering protected code");
   } finally {
      lock.unlock();
   }
} else {
   System.out.println("Unable to acquire lock, doing something else");
}

The code is the same as before, except this time, one of the threads waits up to 10 seconds to acquire the lock.

64
Q

Acquiring the Same Lock Twice

A

The ReentrantLock class maintains a counter of the number of times a lock has been successfully granted to a thread. To release the lock for other threads to use, unlock() must be called the same number of times the lock was granted.

It is critical that you release a lock the same number of times it is acquired!
For calls with tryLock(), you need to call unlock() only if the method returned true.

65
Q

The following code snippet contains an error. Can you spot it?

Lock lock = new ReentrantLock();
if(lock.tryLock()) {
   try {
      lock.lock();
      System.out.println("Lock obtained, entering protected code");
   } finally {
      lock.unlock();
   } }
A

The thread obtains the lock twice but releases it only once. You can verify this by spawning a new thread after this code runs that attempts to obtain a lock. The following prints false:

new Thread(() -> System.out.print(lock.tryLock())).start(); // false

It is critical that you release a lock the same number of times it is acquired!
For calls with tryLock(), you need to call unlock() only if the method returned true.

66
Q

Reviewing the Lock Framework

A

To review, the ReentrantLock class supports the same features as a synchronized block while adding a number of improvements:
* Ability to request a lock without blocking.
* Ability to request a lock while blocking for a specified amount of time.
* A lock can be created with a fairness property, in which the lock is granted to threads in the order in which it was requested.

67
Q

> [!TIPS]
While not on the exam, ReentrantReadWriteLock is a really useful class. It includes separate locks for reading and writing data and is useful on data structures where reads are far more common than writes. For example, if you have a thousand threads reading data but only one thread writing data, this class can help you maximize concurrent access.

A

ReentrantReadWriteLock

68
Q

Orchestrating Tasks with a CyclicBarrier

A

how to orchestrate complex tasks with many steps.

The CyclicBarrier takes in its constructors a limit value, indicating the number of threads to wait for.

As each thread finishes, it calls the await() method on the cyclic barrier.

Once the specified number of threads have each called await(), the barrier is released, and all threads can continue.

69
Q

To coordinate these tasks, we can use the CyclicBarrier class:

import java.util.concurrent.*;
public class LionPenManager {
private void removeLions() { System.out.println(“Removing lions”); }
private void cleanPen() { System.out.println(“Cleaning the pen”); }
private void addLions() { System.out.println(“Adding lions”); }
public void performTask() {
removeLions();
cleanPen();
addLions();
}
public static void main(String[] args) {
var service = Executors.newFixedThreadPool(4);
try {
var manager = new LionPenManager();
for (int i = 0; i < 4; i++)
service.submit(() -> manager.performTask());
} finally {
service.shutdown();
} } }

A

The following is sample output based on this implementation:

Removing lions
Removing lions
Cleaning the pen
Adding lions
Removing lions
Cleaning the pen
Adding lions
Removing lions
Cleaning the pen
Adding lions
Cleaning the pen
Adding lions

70
Q
import java.util.concurrent.*;
public class LionPenManager {
   private void removeLions() { System.out.println("Removing lions");   }
   private void cleanPen()    { System.out.println("Cleaning the pen"); }
   private void addLions()    { System.out.println("Adding lions");     }
   public void performTask(CyclicBarrier c1, CyclicBarrier c2) {
      try {
         removeLions();
         c1.await();
         cleanPen();
         c2.await();
         addLions();
      } catch (InterruptedException | BrokenBarrierException e) {
         // Handle checked exceptions here
      }
   }
   public static void main(String[] args) {
      var service = Executors.newFixedThreadPool(4);
      try {
         var manager = new LionPenManager();
         var c1 = new CyclicBarrier(4);
         var c2 = new CyclicBarrier(4,  () -> System.out.println("*** Pen Cleaned!"));
         for (int i = 0; i < 4; i++)
            service.submit(() -> manager.performTask(c1, c2));
      } finally {
         service.shutdown();
      } } }
A

var c1 = new CyclicBarrier(4);

executes a Runnable instance upon completion.
var c2 = new CyclicBarrier(4, () -> System.out.println(" Pen Cleaned!"));

The following is sample output based on this revised implementation of our LionPenManager class:

Removing lions
Removing lions
Removing lions
Removing lions
Cleaning the pen
Cleaning the pen
Cleaning the pen
Cleaning the pen
*** Pen Cleaned!
Adding lions
Adding lions
Adding lions
Adding lions
71
Q

Reusing CyclicBarrier

A

After a CyclicBarrier limit is reached (aka the barrier is broken), all threads are released, and the number of threads waiting on the CyclicBarrier goes back to zero. At this point, the CyclicBarrier may be used again for a new set of waiting threads.

For example, if our CyclicBarrier limit is 5 and we have 15 threads that call await(), the CyclicBarrier will be activated a total of three times.

72
Q

Using Concurrent Collections

A

The concurrent classes were created to help avoid common issues in which multiple threads are adding and removing objects from the same collections. At any given instance, all threads should have the same consistent view of the structure of the collection.

73
Q

Understanding Memory Consistency Errors

A

A memory consistency error occurs when two threads have inconsistent views of what should be the same data.

Conceptually, we want writes on one thread to be available to another thread if it accesses the concurrent collection after the write has occurred.

When two threads try to modify the same nonconcurrent collection, the JVM may throw a ConcurrentModificationException at runtime. In fact, it can happen with a single thread.

74
Q
11: var foodData = new HashMap<String, Integer>();
12: foodData.put("penguin", 1);
13: foodData.put("flamingo", 2);
14: for(String key: foodData.keySet())
15:    foodData.remove(key);
A

This snippet will throw a ConcurrentModificationException during the second iteration of the loop, since the iterator on keySet() is not properly updated after the first element is removed. Changing the first line to use a ConcurrentHashMap will prevent the code from throwing an exception at runtime.

11: var foodData = new ConcurrentHashMap<String, Integer>();

ConcurrentHashMap is ordering read/write access such that all access to the class is consistent. In this code snippet, the iterator created by keySet() is updated as soon as an object is removed from the Map.

75
Q

Working with Concurrent Classes

A

Working with Concurrent Classes

76
Q

> [!TIPS]
If the collection is immutable (and contains immutable objects), the concurrent collections are not necessary.
Immutable objects can be accessed by any number of threads and do not require synchronization.
By definition, they do not change, so there is no chance of a memory consistency error.

A

concurrent collections

77
Q

When passing around a concurrent collection, a caller may need to know the particular implementation class. That said, it is considered a good practice to pass around a nonconcurrent interface reference when possible, similar to how we instantiate a HashMap but often pass around a Map reference:

Map<String,Integer> map = new ConcurrentHashMap<>();

A

Map<String,Integer> map = new ConcurrentHashMap<>();

78
Q

lists the common concurrent classes with which you should be familiar for the exam.

A
  • ConcurrentHashMap, interfaces : Map, ConcurrentMap
  • ConcurrentLinkedQueue, interfaces : Queue
  • ConcurrentSkipListMap, interfaces : Map, SortedMap, NavigableMap, ConcurrentMap, ConcurrentNavigableMap
  • ConcurrentSkipListSet, interfaces : Set, SortedSet, NavigableSet
  • CopyOnWriteArrayList, interfaces : List
  • CopyOnWriteArraySet, interfaces : Set
  • LinkedBlockingQueue, interfaces : Queue, BlockingQueue
79
Q

Working with Concurrent Classes

A

ConcurrentHashMap vs. Map,
or ConcurrentLinkedQueue vs. Queue.

For the exam, you don’t need to know any class-specific concurrent methods.
You just need to know the inherited methods, such as get() and set() for List instances.

The Skip classes might sound strange, but they are just “sorted” versions of the associated concurrent collections. When you see a class with Skip in the name, just think “sorted concurrent” collections, and the rest should follow naturally.

The CopyOnWrite classes behave a little differently than the other concurrent examples you have seen. These classes create a copy of the collection any time a reference is added, removed, or changed in the collection and then update the original collection reference to point to the copy. These classes are commonly used to ensure an iterator doesn’t see modifications to the collection.

LinkedBlockingQueue, which implements the concurrent BlockingQueue interface. This class is just like a regular Queue, except that it includes overloaded versions of offer() and poll() that take a timeout. These methods wait (or block) up to a specific amount of time to complete an operation.

80
Q
List<Integer> favNumbers = new CopyOnWriteArrayList<>(List.of(4, 3, 42));
for (var n : favNumbers) {
   System.out.print(n + " ");                      // 4 3 42
   favNumbers.add(n+1);
}
System.out.println();
System.out.println("Size: " + favNumbers.size());  // Size: 6
A

Despite adding elements, the iterator is not modified, and the loop executes exactly three times. Alternatively, if we had used a regular ArrayList object, a ConcurrentModificationException would have been thrown at runtime. The CopyOnWrite classes can use a lot of memory, since a new collection structure is created any time the collection is modified. Therefore, they are commonly used in multithreaded environment situations where reads are far more common than writes.

81
Q

> [!NOTE]
A CopyOnWrite instance is similar to an immutable object, as a new underlying structure is created every time the collection is modified.
Unlike a true immutable object, though, the reference to the object stays the same even while the underlying data is changed.

A

CopyOnWrite

82
Q

Obtaining Synchronized Collections

A
83
Q

Synchronized Collections methods

A

These synchronized methods are defined in the Collections class. They operate on the inputted collection and return a reference that is the same type as the underlying collection.

    • synchronizedCollection(Collection<T> c)</T>
  • synchronizedList(List<T> list)</T>
  • synchronizedMap(Map<K,V> m)
  • synchronizedNavigableMap(NavigableMap<K,V> m)
  • synchronizedNavigableSet(NavigableSet<T> s)</T>
  • synchronizedSet(Set<T> s)</T>
  • synchronizedSortedMap(SortedMap<K,V> m)
  • synchronizedSortedSet(SortedSet<T> s)</T>
84
Q

If you’re writing code to create a collection and it requires synchronization, you should use the classes defined in Table 13.9.

On the other hand, if you are passed a nonconcurrent collection and need synchronization, use the methods in Table 13.10.

A

collection

85
Q

Identifying Threading Problems

A

A threading problem can occur in multithreaded applications when two or more threads interact in an unexpected and undesirable way.

86
Q

Understanding Liveness

A

Many thread operations can be performed independently, but some require coordination.
For example,
synchronizing on a method requires all threads that call the method to wait for other threads to finish before continuing.

You also saw earlier in the chapter that threads in a CyclicBarrier will each wait for the barrier limit to be reached before continuing.

Liveness is the ability of an application to be able to execute in a timely manner. Liveness problems, then, are those in which the application becomes unresponsive or is in some kind of “stuck” state. More precisely, liveness problems are often the result of a thread entering a BLOCKING or WAITING state forever, or repeatedly entering/exiting these states.

For the exam, there are three types of liveness issues with which you should be familiar: deadlock, starvation, and livelock.

87
Q

Deadlock

A

Deadlock occurs when two or more threads are blocked forever, each waiting on the other.

import java.util.concurrent.*;
class Food {}
class Water {}
public record Fox(String name) {
   public void eatAndDrink(Food food, Water water) {
      synchronized(food) {
         System.out.println(name() + " Got Food!");
         move();
         synchronized(water) {
            System.out.println(name() + " Got Water!");
         } } }
   public void drinkAndEat(Food food, Water water) {
      synchronized(water) {
         System.out.println(name() + " Got Water!");
         move();
         synchronized(food) {
            System.out.println(name() + " Got Food!");
         } } }
   public void move() {
      try { Thread.sleep(100); } catch (InterruptedException e) {}
   }
   public static void main(String[] args) {
      // Create participants and resources
      var foxy = new Fox("Foxy");
      var tails = new Fox("Tails");
      var food = new Food();
      var water = new Water();
      // Process data
      var service = Executors.newScheduledThreadPool(10);
      try {
         service.submit(() -> foxy.eatAndDrink(food,water));
         service.submit(() -> tails.drinkAndEat(food,water));
      } finally {
         service.shutdown();
      } } }

The result is that our program outputs the following, and it hangs indefinitely:

Foxy Got Food!
Tails Got Water!
88
Q

Starvation

A

Starvation occurs when a single thread is perpetually denied access to a shared resource or lock. The thread is still active, but it is unable to complete its work as a result of other threads constantly taking the resource that it is trying to access.

89
Q

Livelock

A

Livelock occurs when two or more threads are conceptually blocked forever, although they are each still active and trying to complete their task. Livelock is a special case of resource starvation in which two or more threads actively try to acquire a set of locks, are unable to do so, and restart part of the process.
Livelock is often a result of two threads trying to resolve a deadlock.

In practice, livelock is often a difficult issue to detect. Threads in a livelock state appear active and able to respond to requests, even when they are stuck in an endless cycle.

90
Q

Managing Race Conditions

A

A race condition is an undesirable result that occurs when two tasks that should be completed sequentially are completed at the same time. We encountered examples of race conditions earlier in the chapter when we introduced synchronization.

Possible Outcomes for This Race Condition
* Both users are able to create accounts with the username ZooFan.
* Neither user is able to create an account with the username ZooFan, and an error message is returned to both users.
* One user is able to create an account with the username ZooFan, while the other user receives an error message.

For the exam, you should understand that race conditions lead to invalid data if they are not properly handled. Even the solution where both participants fail to proceed is preferable to one in which invalid data is permitted to enter the system.

91
Q

Working with Parallel Streams

A

A parallel stream is capable of processing results concurrently, using multiple threads. For example, you can use a parallel stream and the map() operation to operate concurrently on the elements in the stream, vastly improving performance over processing a single element at a time.
Using a parallel stream can change not only the performance of your application but also the expected results. As you shall see, some operations also require special handling to be able to be processed in a parallel manner.

92
Q

serial stream

A

A serial stream is a stream in which the results are ordered, with only one entry being processed at a time.

93
Q

> [!TIPS]
The number of threads available in a parallel stream is proportional to the number of available CPUs in your environment.

A

parallel stream

94
Q

Creating Parallel Streams

A

For the exam, you should be familiar with two ways of creating a parallel stream.

Collection<Integer> collection = List.of(1,2);

Stream<Integer> p1 = collection.stream().parallel();
Stream<Integer> p2 = collection.parallelStream();

The first way to create a parallel stream is from an existing stream. Isn’t this cool? Any stream can be made parallel!
parallel()

The second way to create a parallel stream is from a Java Collection class. We use both of these methods throughout this section.
parallelStream()

95
Q

Performing a Parallel Decomposition

A

A parallel decomposition is the process of taking a task, breaking it into smaller pieces that can be performed concurrently, and then reassembling the results. The more concurrent a decomposition, the greater the performance improvement of using parallel streams.

Let’s try it out. First, let’s define a reusable function that “does work” just by waiting for five seconds.

private static int doWork(int input) {
   try {
      Thread.sleep(5000);
   } catch (InterruptedException e) {}
   return input;
}

We can pretend that in a real application, this work might involve calling a database or reading a file. Now let’s use this method with a serial stream.

10: long start = System.currentTimeMillis();
11: List.of(1,2,3,4,5)
12:    .stream()
13:    .map(w -> doWork(w))
14:    .forEach(s -> System.out.print(s + " "));
15:
16: System.out.println();
17: var timeTaken = (System.currentTimeMillis()-start)/1000;
18: System.out.println("Time: "+timeTaken+" seconds");

What do you think this code will output when executed as part of a main() method? Let’s take a look:

1 2 3 4 5
Time: 25 seconds

As you might expect, the results are ordered and predictable because we are using a serial stream. It also took around 25 seconds to process all five results, one at a time. What happens if we replace line 12 with one that uses a parallelStream()? The following is some sample output:

3 2 1 5 4
Time: 5 seconds

What about the time required? In this case, our system had enough CPUs for all of the tasks to be run concurrently. If you ran this same code on a computer with fewer processors, it might output 10 seconds, 15 seconds, or some other value. The key is that we’ve written our code to take advantage of parallel processing when available, so our job is done.

95
Q

> [!NOTE]
The Stream interface includes a method isParallel() that can be used to test whether the instance of a stream supports parallel processing.
Some operations on streams preserve the parallel attribute, while others do not.

A

isParallel()

96
Q

Ordering Results

A

If your stream operation needs to guarantee ordering and you’re not sure if it is serial or parallel, you can replace line 14 with one that uses forEachOrdered():

14: .forEachOrdered(s -> System.out.print(s + " "));

This outputs the results in the order in which they are defined in the stream:

1 2 3 4 5
Time: 5 seconds

While we’ve lost some of the performance gains of using a parallel stream, our map() operation can still take advantage of the parallel stream.

97
Q

Processing Parallel Reductions

A

A parallel reduction is a reduction operation applied to a parallel stream.

The results for parallel reductions can differ from what you expect when working with serial streams.

98
Q

Performing Order-Based Tasks

A

Since order is not guaranteed with parallel streams, methods such as findAny() on parallel streams may result in unexpected behavior.

99
Q

Consider the following example:

System.out.print(List.of(1,2,3,4,5,6)
   .parallelStream()
   .findAny()
   .get());
A

The JVM allocates a number of threads and returns the value of the first one to return a result, which could be 4, 2, and so on. While neither the serial nor the parallel stream is guaranteed to return the first value, the serial stream often does. With a parallel stream, the results are likely to be more random.

What about operations that consider order, such as findFirst(), limit(), and skip()?
Order is still preserved, but performance may suffer on a parallel stream as a result of a parallel processing task being forced to coordinate all of its threads in a synchronized-like fashion.
On the plus side, the results of ordered operations on a parallel stream will be consistent with a serial stream. For example, calling skip(5).limit(2).findFirst() will return the same result on ordered serial and parallel streams.

100
Q

Creating Unordered Streams

A

All of the streams you have been working with are considered ordered by default. It is possible to create an unordered stream from an ordered stream, similar to how you create a parallel stream from a serial stream.

List.of(1,2,3,4,5,6).stream().unordered();

This method does not reorder the elements; it just tells the JVM that if an order-based stream operation is applied, the order can be ignored. For example, calling skip(5) on an unordered stream will skip any 5 elements, not necessarily the first 5 required on an ordered stream.
For serial streams, using an unordered version has no effect. But on parallel streams, the results can greatly improve performance.

List.of(1,2,3,4,5,6).stream().unordered().parallel();

Even though unordered streams will not be on the exam, if you are developing applications with parallel streams, you should know when to apply an unordered stream to improve performance.

101
Q

Combining Results with reduce()

A

the stream operation reduce() combines a stream into a single object.

  • Recall that the first parameter to the reduce() method is called the identity,
  • the second parameter is called the accumulator,
  • and the third parameter is called the combiner.
<U> U reduce(U identity,
   BiFunction<U,? super T,U> accumulator,
   BinaryOperator<U> combiner)
102
Q

> [!NOTES]
While this is not in scope for the exam, the accumulator and combiner must be associative, non-interfering, and stateless.
Don’t panic; you don’t need to know advanced math terms for the exam!

A

accumulator and combiner

102
Q

We can concatenate a list of char values using the reduce() method, as shown in the following example:

System.out.println(List.of('w', 'o', 'l', 'f')
   .parallelStream()
   .reduce("",
      (s1,c) -> s1 + c,
      (s2,s3) -> s2 + s3)); // wolf
A

The naming of the variables in this stream example is not accidental.
We used c for char, whereas s1, s2, and s3 are String values.

On parallel streams, the reduce() method works by applying the reduction to pairs of elements within the stream to create intermediate values and then combining those intermediate values to produce a final result. Put another way, in a serial stream, wolf is built one character at a time. In a parallel stream, the intermediate values wo and lf are created and then combined.

With parallel streams, we now have to be concerned about order. What if the elements of a string are combined in the wrong order to produce wlfo or flwo? The Stream API prevents this problem while still allowing streams to be processed in parallel, as long as you follow one simple rule: make sure that the accumulator and combiner produce the same result regardless of the order they are called in.

With parallel streams, though, order is no longer guaranteed, and any argument that violates these rules is much more likely to produce side effects or unpredictable results.

103
Q
System.out.println(List.of(1,2,3,4,5,6)
   .parallelStream()
   .reduce(0, (a, b) -> (a - b))); // PROBLEMATIC ACCUMULATOR

It may output -21, 3, or some other value.

A

We can omit a combiner parameter in these examples, as the accumulator can be used when the intermediate data types are the same.

It may output -21, 3, or some other value.

104
Q

Selecting a reduce() Method

A

Although the one- and two-argument versions of reduce() support parallel processing, it is recommended that you use the three-argument version of reduce() when working with parallel streams. Providing an explicit combiner method allows the JVM to partition the operations in the stream more efficiently.

105
Q

Combining Results with collect()

A

Stream API includes a three-argument version of collect() that takes
accumulator and combiner operators along with a supplier operator instead of an identity.

<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner)

the accumulator and combiner operations must be able to process results in any order.

Stream<String> stream = Stream
.of("w", "o", "l","f")
.parallel();
SortedSet<String> set = stream
.collect(ConcurrentSkipListSet::new, Set::add, Set::addAll);
System.out.println(set); // [f, l, o, w]

elements in a ConcurrentSkipListSet are sorted according to their natural ordering.

You should use a concurrent collection to combine the results, ensuring that the results of concurrent threads do not cause a ConcurrentModificationException.

Performing parallel reductions with a collector requires additional considerations.
For example,
if the collection into which you are inserting is an ordered data set, such as a List, the elements in the resulting collection must be in the same order, regardless of whether you use a serial or parallel stream. This may reduce performance, though, as some operations cannot be completed in parallel.

106
Q

Performing a Parallel Reduction on a Collector

A

Every Collector instance defines a characteristics() method that returns a set of Collector.Characteristics attributes. When using a Collector to perform a parallel reduction, a number of properties must hold true. Otherwise, the collect() operation will execute in a single-threaded fashion.

107
Q

Requirements for Parallel Reduction with collect()

A
  • The stream is parallel.
  • The parameter of the collect() operation has the Characteristics.CONCURRENT characteristic.
  • Either the stream is unordered or the collector has the characteristic Characteristics.UNORDERED.

For example, while Collectors.toSet() does have the UNORDERED characteristic, it does not have the CONCURRENT characteristic. Therefore, the following is not a parallel reduction even with a parallel stream:

parallelStream.collect(Collectors.toSet()); // Not a parallel reduction

The Collectors class includes two sets of static methods for retrieving collectors, toConcurrentMap() and groupingByConcurrent(), both of which are UNORDERED and CONCURRENT.

These methods produce Collector instances capable of performing parallel reductions efficiently. Like their nonconcurrent counterparts, there are overloaded versions that take additional arguments.

Stream<String> ohMy = Stream.of("lions", "tigers", "bears").parallel();
ConcurrentMap<Integer, String> map = ohMy .collect(Collectors.toConcurrentMap(String::length, k -> k, (s1, s2) -> s1 + "," + s2));
System.out.println(map); // {5=lions,bears,6=tigers}
System.out.println(map.getClass()); // java.util.concurrent.ConcurrentHashMap

groupingBy() example

var ohMy = Stream.of("lions", "tigers", "bears").parallel();
ConcurrentMap<Integer, List<String>> map = ohMy.collect(Collectors.groupingByConcurrent(String::length));
System.out.println(map); // {5=[lions, bears],6=[tigers]}
108
Q

Avoiding Stateful Streams

A

Side effects can appear in parallel streams if your lambda expressions are stateful.

A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline.

For example, the following method that filters out even numbers is stateful:

public List<Integer> addValues(IntStream source) {
    var data = Collections.synchronizedList(new ArrayList<Integer>());
    source.filter(s -> s % 2 == 0)
    .forEach(i -> { data.add(i); }); // STATEFUL: DON'T DO THIS!
    return data;
}

Let’s say this method is executed with a serial stream:

var list = addValues(IntStream.range(1, 11));
System.out.print(list); // [2, 4, 6, 8, 10]

But what if someone else passes in a parallel stream?

var list = addValues(IntStream.range(1, 11).parallel());
System.out.print(list); // [6, 8, 10, 2, 4]

Oh, no: our results no longer match our input order! The problem is that our lambda expression is stateful and modifies a list that is outside our stream. We can fix this solution by rewriting our stream operation to be stateless:

public List<Integer> addValuesBetter(IntStream source) {
    return source.filter(s -> s % 2 == 0)
        .boxed()
        .collect(Collectors.toList());
}

This method processes the stream and then collects all the results into a new list. It produces the same ordered result on both serial and parallel streams. It is strongly recommended that you avoid stateful operations when using parallel streams, to remove any potential data side effects. In fact, they should be avoided in serial streams since doing so limits the code’s ability to someday take advantage of parallelization.

109
Q

Summary

A
  • how to create and define the thread’s work using a Runnable instance
  • how to pause and interrupt the thread.
  • When working with the Concurrency API,
    you should also know how to create threads using Callable lambda expressions.
  • how to concurrently execute tasks using ExecutorService like a pro.
  • know which ExecutorService instances are available, including scheduled and pooled services.
  • Thread-safety is about protecting data from being corrupted by multiple threads modifying it at the same time.
  • Java offers many tools to keep data safe, including atomic classes, synchronized methods/blocks, the Lock framework, and CyclicBarrier.
  • Concurrency API also includes numerous collection classes that handle multithreaded access for you. You should be familiar with the concurrent collections, including the CopyOnWrite classes, which create a new underlying structure any time the underlying collection is modified.
  • potential threading issues can arise. (For the exam, you need to know only the basic theory behind these concepts.)
    • Deadlock, starvation, and livelock can result in programs that appear stuck,
    • race conditions can result in unpredictable data
  • parallel streams
    • how to use them to perform parallel decompositions and reductions.
    • can greatly improve the performance
    • can also cause unexpected results since the processing is no longer ordered.
    • Remember to avoid stateful lambda expressions, especially when working with parallel streams.
110
Q

Exam Essentials

A
  • Be able to write thread-safe code. Thread-safety is about protecting shared data from concurrent access. A monitor can be used to ensure that only one thread processes a particular section of code at a time. In Java, monitors can be implemented with a synchronized block or method or using an instance of Lock. ReentrantLock has a number of advantages over using a synchronized block, including the ability to check whether a lock is available without blocking it, as well as supporting the fair acquisition of locks. To achieve synchronization, two or more threads must coordinate on the same shared object.
  • Be able to apply the atomic classes. An atomic operation is one that occurs without interference from another thread. The Concurrency API includes a set of atomic classes that are similar to the primitive classes, except that they ensure that operations on them are performed atomically. Know the difference between an atomic variable and one marked with the volatile modifier.
  • Create concurrent tasks with a thread executor service using Runnable and Callable. An ExecutorService creates and manages a single thread or a pool of threads. Instances of Runnable and Callable can both be submitted to a thread executor and will be completed using the available threads in the service. Callable differs from Runnable in that Callable returns a generic data type and can throw a checked exception. A ScheduledExecutorService can be used to schedule tasks at a fixed rate or with a fixed interval between executions.
  • Be able to use the concurrent collection classes. The Concurrency API includes numerous collection classes that include built-in support for multithreaded processing, such as ConcurrentHashMap. It also includes a class CopyOnWriteArrayList that creates a copy of its underlying list structure every time it is modified and is useful in highly concurrent environments.
  • Identify potential threading problems. Deadlock, starvation, and livelock are three threading problems that can occur and result in threads never completing their task. Deadlock occurs when two or more threads are blocked forever. Starvation occurs when a single thread is perpetually denied access to a shared resource. Livelock is a form of starvation where two or more threads are active but conceptually blocked forever. Finally, race conditions occur when two threads execute at the same time, resulting in an unexpected outcome.
  • Understand the impact of using parallel streams. The Stream API allows for the easy creation of parallel streams. Using a parallel stream can cause unexpected results, since the order of operations may no longer be predictable. Some operations, such as reduce() and collect(), require special consideration to achieve optimal performance when applied to a parallel stream.