Chapter 13 Concurrency Notes Flashcards
Introducing Threads
* thread
* process
* single-threaded process
* multithreaded process
* shared environment
* task
* A thread can complete multiple independent tasks but only one task at a time.
* shared memory
,
- A
thread
is the smallest unit of execution that can be scheduled by the operating system. - A
process
is a group of associated threads that execute in the same shared environment. - A
task
is a single unit of work performed by a thread. -
shared environment
, we mean that the threads in the same process share the same memory space and can communicate directly with one another. -
shared memory
, we are generally referring to static variables as well as instance and local variables passed to a thread.
Understanding Thread Concurrency
- The property of executing multiple threads and processes at the same time is referred to as concurrency.
- Operating systems use a thread scheduler to determine which threads should be currently executing
- A context switch is the process of storing a thread’s current state and later restoring the state of the thread to continue execution.
- Finally, a thread can interrupt or supersede another thread if it has a higher thread priority than the other thread. A thread priority is a numeric value associated with a thread that is taken into consideration by the thread scheduler when determining which threads should currently be executing. In Java, thread priorities are specified as integer values.
How does the system decide what to execute when there are more threads available than CPUs?
Operating systems use a thread scheduler
to determine which threads should be currently executing, as shown in Figure 13.1. For example, a thread scheduler may employ a round-robin schedule
in which each available thread receives an equal number of CPU cycles with which to execute, with threads visited in a circular order.
context switch
A context switch
is the process of storing a thread’s current state and later restoring the state of the thread to continue execution.
thread priority
A thread priority
is a numeric value
associated with a thread that is taken into consideration by the thread scheduler when determining which threads should currently be executing. In Java, thread priorities are specified as integer
values.
Runnable
@FunctionalInterface public interface Runnable { void run(); }
- One of the most common ways to define a task for a thread is by using the
Runnable
instance. - Runnable is a functional interface that takes no arguments and returns no data.
Creating a Thread
new Thread(() -> System.out.print("Hello")).start(); System.out.print("World");
- Provide a
Runnable
object orlambda expression
to the Thread constructor. start()
start new thread
Remember that order of thread execution is not often guaranteed. The exam commonly presents questions in which multiple tasks are started at the same time, and you must determine the result.
Calling run() Instead of start()
System.out.println("begin"); new Thread(printInventory).run(); new Thread(printRecords).run(); new Thread(printInventory).run(); System.out.println("end");
Calling run()
on a Thread
or a Runnable
does not start a new thread.
we can create a Thread and its associated task one of two ways in Java:
More generally, we can create a Thread and its associated task one of two ways in Java:
- Provide a
Runnable
object or lambda expression to theThread constructor
. - Create a class that
extends Thread
and overrides therun()
method.
Creating a class that extends Thread is relatively uncommon and should only be done under certain circumstances, such as if you need to overwrite other thread methods.
Distinguishing Thread Types
* System thread
* User-defined thread
* Daemon thread
-
System thread
, created by the(JVM) and runs in the background of the application. For example, garbage collection is managed by a system thread created by the JVM. -
User-defined thread
, created by the application developer to accomplish a specific task. -
Daemon threads
-
System and user-defined threads can both be created as
daemon threads
. - by default,
user-defined threads
are notdaemons
- A Java application terminates when the only threads that are running are daemon threads.
-
set
thread
asdaemon
, beforestart()
job.setDaemon(true);
-
System and user-defined threads can both be created as
Managing a Thread’s Life Cycle
- NEW (thread created but no started )
-
RUNNABLE (running or able to be run)
- BLOCKED (waiting to enter synchronized block)
- WAITING (waiting indefinitely to be notified)
- TIMED_WAITING (waiting a specified time)
- TERMINATED (task complete)
create thread -> NEW - start()
-> RUNNABLE - run()
completes-> TERMINATED
RUNNABLE -resource requested -> BLOCKED
RUNNABLE <-resource granted- BLOCKED
RUNNABLE -wait() -> WAITING
RUNNABLE <-notify() - WAITING
RUNNABLE -sleep() -> TIMED_WAITING
RUNNABLE <-time elapsed - TIMED_WAITING
Polling with Sleep
Thread.sleep(1_000); // 1 SECOND
Even though multithreaded programming allows you to execute multiple tasks at the same time, one thread often needs to wait for the results of another thread to proceed. One solution is to use polling. Polling is the process of intermittently checking data at some fixed interval.
We can improve this result by using the Thread.sleep() method to implement polling and sleep for 1,000 milliseconds, aka 1 second:
public class CheckResultsWithSleep { private static int counter = 0; public static void main(String[] a) { new Thread(() -> { for(int i = 0; i < 1_000_000; i++) counter++; }).start(); while(counter < 1_000_000) { System.out.println("Not reached yet"); try { Thread.sleep(1_000); // 1 SECOND } catch (InterruptedException e) { System.out.println("Interrupted!"); } } System.out.println("Reached: "+counter); } }
Polling
Polling
is the process of intermittently checking data at some fixed interval.
Interrupting a Thread
mainThread.interrupt();
One way to improve this program is to allow the thread to interrupt the main() thread when it’s done:
public class CheckResultsWithSleepAndInterrupt { private static int counter = 0; public static void main(String[] a) { final var mainThread = Thread.currentThread(); new Thread(() -> { for(int i = 0; i < 1_000_000; i++) counter++; mainThread.interrupt(); }).start(); while(counter < 1_000_000) { System.out.println("Not reached yet"); try { Thread.sleep(1_000); // 1 SECOND } catch (InterruptedException e) { System.out.println("Interrupted!"); } } System.out.println("Reached: "+counter); } }
final var mainThread = Thread.currentThread();
mainThread.interrupt();
Calling interrupt() on a thread in the TIMED_WAITING or WAITING state causes the main() thread to become RUNNABLE again, triggering an InterruptedException. The thread may also move to a BLOCKED state if it needs to reacquire resources when it wakes up.
> [!NOTE]
Calling interrupt() on a thread already in a RUNNABLE state doesn’t change the state.
In fact, it only changes the behavior if the thread is periodically checking the Thread.isInterrupted() value state.
interrupt()
Thread.isInterrupted()
Thread methods
-
public void run()
If this thread was constructed using a separate Runnable run object, then that Runnable object’s run method is called; otherwise, this method does nothing and returns.
Subclasses of Thread should override this method. -
public void start()
Causes this thread to begin execution; the Java Virtual Machine calls the run method of this thread.
The result is that two threads are running concurrently: the current thread (which returns from the call to the start method) and the other thread (which executes its run method).
It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution. -
public final void join() throws InterruptedException
Waits for this thread to die.
An invocation of this metho behaves in exactly the same way as the invocation -
public static void sleep(long millis) throws InterruptedException
Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds, subject to the precision and accuracy of system timers and schedulers. The thread does not lose ownership of any monitors. -
public static Thread currentThread()
Returns a reference to the currently executing thread object. -
public void interrupt()
Interrupts this thread. -
public static boolean interrupted()
Tests whether the current thread has been interrupted. The interrupted status of the thread is cleared by this method. In other words, if this method were to be called twice in succession, the second call would return false (unless the current thread were interrupted again, after the first call had cleared its interrupted status and before the second call had examined it).
A thread interruption ignored because a thread was not alive at the time of the interrupt will be reflected by this method returning false. -
public boolean isInterrupted()
Tests whether this thread has been interrupted. The interrupted status of the thread is unaffected by this method.
A thread interruption ignored because a thread was not alive at the time of the interrupt will be reflected by this method returning false. -
public Thread.State getState()
Returns the state of this thread. This method is designed for use in monitoring of the system state, not for synchronization control. -
public final void setDaemon(boolean on)
Marks this thread as either a daemon thread or a user thread. The Java Virtual Machine exits when the only threads running are all daemon threads.
This method must be invoked before the thread is started.
Creating Threads with the Concurrency API
- java.util.concurrent
- public interface Executor
- public class Executors extends Object
- public interface ExecutorService extends Executor
-
java.util.concurrent
package - The Concurrency API includes the
ExecutorService interface
, which defines services that create and manage threads.- You first obtain an instance of an
ExecutorService
interface, - and then you send the service tasks to be processed.
- The framework includes numerous useful features, such as thread pooling and scheduling.
- You first obtain an instance of an
- It is recommended that you use this framework any time you need to create and execute a separate task, even if you need only a single thread.
Introducing the Single-Thread Executor
ExecutorService service = Executors.newSingleThreadExecutor(); try { System.out.println("begin"); service.execute(printInventory); service.execute(printRecords); service.execute(printInventory); System.out.println("end"); } finally { service.shutdown(); }
Possible output:
begin
Printing zoo inventory
Printing record: 0
Printing record: 1end
Printing record: 2
Printing zoo inventory
ExecutorService service = Executors.newSingleThreadExecutor();
- The Concurrency API includes the
Executors
factory class that can be used to create instances of the ExecutorService interface. - we use the newSingleThreadExecutor() method to create the service.
- With a single-thread executor, tasks are guaranteed to be executed
sequentially
. - Notice that the
end
text is output while our thread executor tasks are still running. - This is because the main() method is still an independent thread from the ExecutorService.
Shutting Down a Thread Executor
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
boolean isTerminated()
boolean isShutdown()
void shutdown()
List<Runnable> shutdownNow()
- A thread executor creates a
non-daemon
thread on the first task that is executed, so failing to callshutdown()
will result in your applicationnever terminating
. - The shutdown process for a thread executor involves
-
first
rejecting any new tasks submitted
to the thread executor - while
continuing to execute any previously submitted tasks
. - During this time, calling
isShutdown()
will returntrue
, whileisTerminated()
will return false. - If a new task is submitted to the thread executor while it is shutting down, a
RejectedExecutionException
will be thrown. - Once all active tasks have been completed,
isShutdown()
andisTerminated()
will both returntrue
.
-
first
- Figure 13.3 shows the life cycle of an ExecutorService object.
For the exam, you should be aware that shutdown()
does not stop any tasks that have already been submitted to the thread executor.
What if you want to cancel all running and upcoming tasks?
- The ExecutorService provides a method called
shutdownNow()
, - which attempts to stop all running tasks and discards any that have not been started yet.
- It is not guaranteed to succeed
- because it is possible to create a thread that will never terminate, so any attempt to interrupt it may be ignored.
List<Runnable> shutdownNow()
* Attempts to stop all actively executing tasks,
* halts the processing of waiting tasks,
* and returns a list of the tasks that were awaiting execution.
* This method does not wait for actively executing tasks to terminate.
* Use awaitTermination to do that.
* There are no guarantees beyond best-effort attempts to stop processing actively executing tasks.
* For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.
Submitting TasksExecutorService
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
Future<?> submit(Runnable task)
<T> Future<T> submit(Runnable task, T result)
<T> Future<T> submit(Callable<T> task)
Executor
void execute(Runnable command)
You can submit tasks to an ExecutorService instance multiple ways.
1. void execute(Runnable command)
Executes Runnable task at some point in future.
2. Future<?> submit(Runnable task)
Executes Runnable task at some point in future and returns Future representing task.
3. <T> Future<T> submit(Callable<T> task)
Executes Callable task at some point in future and returns Future representing pending results of task.
4. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
Executes given tasks and waits for all tasks to complete. Returns List of Future instances in same order in which they were in original collection.
5. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
Executes given tasks and waits for at least one to complete.
Submitting Tasks: execute() vs. submit()
The submit()
method advantage over execute()
method is a return object that can be used to track the result.
For the exam, you need to be familiar with both execute() and submit(), but in your own code we recommend submit() over execute() whenever possible.
How do we know when a task submitted to an ExecutorService is complete?
submit()
method returns a Future<V>
instance that can be used to determine this result.
Future<?> future = service.submit(() -> System.out.println("Hello"));
- The
Future
type is actually aninterface
. -
Future
instance is returned by various API methods.
Future methods
-
boolean isDone()
Returns true if task was completed, threw exception, or was cancelled. -
boolean isCancelled()
Returns true if task was cancelled before it completed normally. -
boolean cancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of task and returns true if it was successfully cancelled or false if it could not be cancelled or is complete. -
V get()
Retrieves result of task, waiting endlessly if it is not yet available. -
V get(long timeout, TimeUnit unit)
Retrieves result of task, waiting specified amount of time. If result is not ready by time timeout is reached, checked TimeoutException will be thrown.
The following is an updated version of our earlier polling example CheckResults class, which uses a Future instance to wait for the results:
import java.util.concurrent.*; public class CheckResults { private static int counter = 0; public static void main(String[] unused) throws Exception { ExecutorService service = Executors.newSingleThreadExecutor(); try { Future<?> result = service.submit(() -> { for(int i = 0; i < 1_000_000; i++) counter++; }); result.get(10, TimeUnit.SECONDS); // Returns null for Runnable System.out.println("Reached!"); } catch (TimeoutException e) { System.out.println("Not reached in time"); } finally { service.shutdown(); } } }
import java.util.concurrent.*;
ExecutorService service = Executors.newSingleThreadExecutor();
* Obtain ExecutorService instance using a factory method in the Executors class
* Creates an Executor that uses a single worker thread operating off an unbounded queue.
Future<?> result = service.submit(() -> { for(int i = 0; i < 1_000_000; i++) counter++; });
* Since the return type of Runnable run() is void, the get()
method always returns null
when working with Runnable expressions.
result.get(10, TimeUnit.SECONDS); // Returns null for Runnable
* It also waits at most 10 seconds, throwing a TimeoutException
on the call to result.get()
if the task is not done.
<T> Future<T> submit(Runnable task, T result)
* Submits a Runnable task for execution and returns a Future representing that task. The Future’s get method will return the given result upon successful completion.
java.util.concurrent.TimeUnit
Concurrency API use this enum
TimeUnit enum
-
TimeUnit.NANOSECONDS
Time in one-billionths of a second (1/1,000,000,000) -
TimeUnit.MICROSECONDS
Time in one-millionths of a second (1/1,000,000) -
TimeUnit.MILLISECONDS
Time in one-thousandths of a second (1/1,000) -
TimeUnit.SECONDS
Time in seconds -
TimeUnit.MINUTES
Time in minutes -
TimeUnit.HOURS
Time in hours -
TimeUnit.DAYS
Time in days
Introducing Callable
@FunctionalInterface public interface Callable { V call() throws Exception; }
java.util.concurrent.Callable functional interface
-
call()
methodreturns a value
and can throw a checked exception.
~~~
@FunctionalInterface public interface Callable {
V call() throws Exception;
}
~~~ - The Callable interface is often preferable over Runnable, since it allows more details to be retrieved easily from the task after it is completed.
Example using Callable:
var service = Executors.newSingleThreadExecutor(); try { Future result = service.submit(() -> 30 + 11); System.out.println(result.get()); // 41 } finally { service.shutdown(); }
Callable
Waiting for All Tasks to Finish
ExecutorService service = Executors.newSingleThreadExecutor(); try { // Add tasks to the thread executor … } finally { service.shutdown(); } service.awaitTermination(1, TimeUnit.MINUTES); // Check whether all tasks are finished if(service.isTerminated()) System.out.println("Finished!"); else System.out.println("At least one task is still running");
If we don’t need the results of the tasks and are finished using our thread executor, there is a simpler approach.
- First, we shut down the thread executor using the
shutdown()
method. - Next, we use the
awaitTermination()
method available for all thread executors. The method waits the specified time to complete all tasks, returning sooner if all tasks finish or anInterruptedException
is detected. - call
isTerminated()
after theawaitTermination()
method finishes to confirm that all tasks are finished.
Scheduling Tasks
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService methods
-
schedule(Callable callable, long delay, TimeUnit unit)
Creates and executes Callable task after given delay -
schedule(Runnable command, long delay, TimeUnit unit)
Creates and executes Runnable task after given delay -
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
Creates and executes Runnable task after given initial delay, creating new task every period value that passes -
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
Creates and executes Runnable task after given initial delay and subsequently with given delay between termination of one execution and commencement of next
scheduleWithFixedDelay()
scheduleWithFixedDelay()
method creates a new task only after the previous task has finished.
For example, if a task runs at 12:00 and takes five minutes to finish, with a period between executions of two minutes, the next task will start at 12:07.
service.scheduleWithFixedDelay(task1, 0, 2, TimeUnit.MINUTES);
Increasing Concurrency with Pools
A thread pool
is a group of pre-instantiated reusable threads that are available to perform a set of arbitrary tasks.
Executors factory methods
-
ExecutorService newSingleThreadExecutor()
Creates single-threaded executor that uses single worker thread operating off unbounded queue. Results are processed sequentially in order in which they are submitted. -
ScheduledExecutorService newSingleThreadScheduledExecutor()
Creates single-threaded executor that can schedule commands to run after given delay or to execute periodically. -
ExecutorService newCachedThreadPool()
Creates thread pool that creates new threads as needed but reuses previously constructed threads when they are available. -
ExecutorService newFixedThreadPool(int)
Creates thread pool that reuses fixed number of threads operating off shared unbounded queue. -
ScheduledExecutorService newScheduledThreadPool(int)
Creates thread pool that can schedule commands to run after given delay or execute periodically.
Writing Thread-Safe Code
-
Thread-safety
is the property of an object that guarantees safe execution by multiple threads at the same time. - Since threads run in a shared environment and memory space, how do we prevent two threads from interfering with each other?
We must organize access to data so that we don’t end up with invalid or unexpected results. - how to use a variety of techniques to protect data, including
-
atomic classes
, -
synchronized blocks
, - the
Lock framework
, - and
cyclic barriers
.
-
Understanding Thread-Safety
1: import java.util.concurrent.*; 2: public class SheepManager { 3: private int sheepCount = 0; 4: private void incrementAndReport() { 5: System.out.print((++sheepCount)+" "); 6: } 7: public static void main(String[] args) { 8: ExecutorService service = Executors.newFixedThreadPool(20); 9: try { 10: SheepManager manager = new SheepManager(); 11: for(int i = 0; i < 10; i++) 12: service.submit(() -> manager.incrementAndReport()); 13: } finally { 14: service.shutdown(); 15: } } }
It may output in a different order. Worse yet, it may print some numbers twice and not print some numbers at all! The following are possible outputs of this program:
1 2 3 4 5 6 7 8 9 10
1 9 8 7 3 6 6 2 4 5
1 8 7 3 2 6 5 4 2 9
race condition
the unexpected result of two tasks executing at the same time is referred to as a race condition.
Accessing Data with volatile
The volatile
keyword is used to guarantee that access to data within memory is consistent.
The volatile
attribute ensures that only one thread is modifying a variable at one time and that data read among multiple threads is consistent.
3: private volatile int sheepCount = 0; 4: private void incrementAndReport() { 5: System.out.print((++sheepCount)+" "); 6: }
Unfortunately, this code is not thread-safe and could still result in numbers being missed:
2 6 1 7 5 3 2 9 4 8
The reason this code is not thread-safe is that ++sheepCount
is still two distinct operations.
Put another way, if the increment operator represents the expression sheepCount = sheepCount + 1
, then each read and write operation is thread-safe, but the combined operation is not. Referring back to our sheep example, we don’t interrupt the employee while running, but we could still have multiple people in the field at the same time.
> [!NOTE]
In practice, volatile is rarely used.
We only cover it because it has been known to show up on the exam from time to time.
volatile
Protecting Data with Atomic Classes
Atomic is the property of an operation to be carried out as a single unit of execution without any interference from another thread.
A thread-safe atomic version of the increment operator would perform the read and write of the variable as a single operation, not allowing any other threads to access the variable during the operation.
Figure 13.5 shows the result of making the sheepCount variable atomic.
Atomic classes
-
AtomicBoolean
A boolean value that may be updated atomically -
AtomicInteger
An int value that may be updated atomically -
AtomicLong
A long value that may be updated atomically
3: private AtomicInteger sheepCount = new AtomicInteger(0); 4: private void incrementAndReport() { 5: System.out.print(sheepCount.incrementAndGet()+" "); 6: }
2 3 1 4 5 6 7 8 9 10 1 4 3 2 5 6 7 8 9 10 1 4 3 5 6 2 7 8 10 9