Chapter 7: Concurrency Flashcards
Thread types
System thread: Created by JVM and runs in the background of the application. If thread runs out of memory it generates an Error.
User-defined thread: Created by the developer
Daemon thread: Background thread which won’t stop the program from exiting e.g. garbage collection
Thread scheduler
If more threads created than CPU processors available, a Thread scheduler is used to distribute threads across the processors.
Can execute in a round-robin fashion where each thread receives an equal number of CPU cycles with which to execute. (e.g. gets allotted time to execute and if it doesn’t then it will save the state and perform a context switch).
Can use Thread priority constants to determine which Thread should be run.
e.g. MIN_PROIORITY = 1
NORM_PRIORITY = 5
MAX_PRIORITY = 10
Runnable
Implements void run() method and is a functional interface
Creating a Thread
Note: Java doesn't guarantee when the Thread will actually run. You can either pass a Runnable object or lambda expression to Thread constructor, or can extend Thread and override void run(). e.g. (new Thread(() -> \_\_\_ )).start(); and (new OverriddenThread()).start());
Polling with sleep
Can use Thread.sleep() to let the current Thread sleep while we wait for another Thread to complete.
Executor Service
If more tasks submitted that Thread’s available, they get added to internal queue.
Executors.newSingleThreadExecutor();
Returns a FinalizedDelegatedExecutorService.
Tasks using this service will be performed in order (but is still independent of the main program Thread)
Shutting down a Thread Executor
Must call .shutdown() otherwise program will never end - watch out for questions where executor is not shut down.
Once you’ve shut down the Executor, any new tasks submitted will be rejected and throw RejectedExecutionExcepttion.
.shutDownNow() attempts to immediately stop all running tasks (returns List tasks which were stopped).
Good practice to put in a try finally block and shutdown in a finally block.
Executor Service methods
void execute(Runnable command). This is fire and forget. Future> submit(Runnable task) returns a Future, representing the task. Future submit(Callable task) returns a Future representing pending results. invokeAll(Collection of Callables) executes synchronously + returns collection of futures in original order. This will wait indefinitely for all tasks to complete. invokeAny(Collection of Callables) executes synchronously and returns the result of one of the finished tasks, cancelling unfinished tasks.
Future methods
boolean isDone() -> true is task is completed, cancelled or exception boolean isCancelled() boolean cancel() --> attempts to cancel V get() --> gets results (waits endlessly if not yet available) V get(long timeout, TimeUnit unit). Can throw TimeoutException
Callable
V call() throws Exception .get() returns object of type V (or null)
Exceptions in Runnable and Callable
If lambda expression has a return type in ExecutorService submit() method, it will see it as Callable. So this way it will support a checked Exception, otherwise it won’t support it.
Scheduling Tasks
ScheduleExecutorService
methods: schedule(Callable, long, TimeUnit)
schedule(Runnable, long TimeUnit)
both of these return a ScheduleFuture
scheduleAtFixedRate(Runnable, long initial delay, long delay at fixed rate (regardless of whether previous task finished), TimeUnit)
scheduleWithFixedDelay(Runnable, long initial delay, long delay between termination of one task and start of next, TimeUnit)
Would generate endless Future objects, so can’t use Callable.
Increasing Concurrency with Pools
newSingleThreadExecutor() --> 1 thread results processed in order submitted newSingleThreadScheduledExecutor() --> can delay commands to run after a given delay newCachedThreadPool() --> Thread pool with new threads as needed (will re-use previously constructed threads when they are available). Discouraged to use this as it can take over the application newFixedThreadPool(int nThreads) --> Fixed no. of threads operating on a shared unbounded queue. newScheduledThreadPool(int nThreads)
Synchronising data access
Can use Atomic classes to protect data, which means an operation can be carried out as a single unit of execution without interference from another thread.
e.g. AtomicBoolean
AtomicLong
AtomicLongArray
AtomicInteger
AtomicIntegerArray
AtomicReference –> Reference to an object which can be updated atomically
Note: if not using an Atomic class can’t be sure of result at Runtime
Methods:
.get()
.set()
.getAndSet() sets new value and returns old value
.incrementAndGet() - equivalent to ++ value
.decrementAndGet()
.getAndDecrement().
With Atomic classes, consistency is assumed between data access, but won’t always be in order
Data access in order
Use a monitor (or a lock)
synchronized block.
The first thread there acquires the clock, any subsequent Threads need to wait for the lock to become available.
If you have a method you want to synchronize can either use synchronize(this) or add synchronized keyword by the method signature.
If it’s a static method you want to sychronize, you can use synchronize(MyClass.class), here the class is the monitor, or use the static synchronized in method signature
Remember, to make Thread-safe, all methods in class must be synchronized!!
Concurrent Collections
ConcurrentHashMap Notordered Notsorted ConcurrentLinkedDeque Ordered Notsorted ConcurrentLinkedQueue Ordered Notsorted ConcurrentSkipListMap Ordered Sorted ConcurrentSkipListSet Ordered Sorted CopyOnWriteArrayList Ordered Notsorted CopyOnWriteArraySet Notordered Notsorted LinkedBlockingDeque Ordered Notsorted Blocking LinkedBlockingQueue Ordered Notsorted Blocking
Blocking Queues
Will try to add immediately but if queue is full already, will wait until there is space. e.g. Producer/Consumer problem where the Consumers aren’t consuming fast enough.
Queue:
.offer(E e, long timeout, TimeUnit unit)
returns false if time elapses before space free
.poll(long timeout, TimeUnit unit)
returns null if time elapses before item is available.
Deque:
.offerFirst(E e, long timeout, TimeUnit unit)
adds to front of queue, false if time elapses before space availalbe
.offerLast(E e, long timeout, TimeUnit unit) Adds to back of Queue)
.pollFirst
.pollLast
Deque methods throw InterruptedException
SkipList collections
These are concurrent version of Treeset & Treemap
i.e. they are sorted.
Recommended to assign them to super class e.g. SortedMap or NavigableSet.
Allows modifications while iterating: watch out for infinite loop
CopyOnWrite collections
Copy elements to a new structure anytime an element is added, modified or remove.
Note: when iterating over items, we don’t start iterating over added elements (just the original ones)
so avoids infinite loops.
Synchronised collections
synchronisedCollection synchronisedList synchronisedMap synchronisedNaviagableMap synchronisedNavigableSet synchronisedSet synchronisedSortedMap synchronisedSortedSet
Use these to wrap an existing object in a sychronized object
These methods synchronize get and set methods, but do not synchronize iterators.
Will need to synchronize iterators by hand. Also will throw exception if elements removed by a single Thread
Parallel streams
Call .parallel() on an existing stream (or an existing parallel stream)
or can use .parralelStream()
Note: you can order results using .forEachOrdered()
Use concurrent collections with synchronized streams to avoid race conditions.
Note: if you have a stream of parallel streams and you use flatmap, the results be an excution which is in a single-threaded fashion.
Parallel reductions
.findAny() will be unpredictable
Stream operations based on order may perform more slowly (e.g. findFirst(), limit(), skip())
so you can use .unordered().parallelStream() to improve performance by making the JVM ignore order for these operations
reduce() parallel streams can reduce in order but must adhere to:
identity defined such that combiner.apply(identity , u) is U
accumulator must be associative and stateless so that (a op b) op c is b op c.
combiner must be associative and stateless and compatible with identity.
How to ensure efficient reduction with collect() parameter of the collect operation has Collector.Characheristics.CONCURRENT.
Can reduce concureently using
Collectors.toConcurrentMap()
or Collectors.groupingByConcurrent()
Cyclic Barrier
Used when we need to ensure certain tasks are performed after all Threads have finished a previous task.
In constructor takes n num of Threads to wait for. Call .await() for that many Threads to wait.
Makes sure no. of threads you are using is at least as big as the limit of the Cyclic Barrier or else there will be deadlock!
If you have more Threads than Cyclic Barrier limit, then Barrier will be activated more than once.
Fork/Join framework
How does it work? It uses the worker stealing Algorithm. This is managed by the ForkJoinPool (which extends ExecutorService). Each Thread has a Deque of tasks. Each thread completes tasks from the head of the Deque. Then once it has completed all of its own tasks, it will steal a task from the tail of one of the other Deques.
Can get access to a pool using ForkJoinPool.commonPool().
Aim is to start with 1 task and split it up recursively into smaller tasks
Need to extend both:
RecursiveAction returns void()
RecursiveTask returns generic type
Need to override protected compute() method.This will define a resursive function which has a base case and a recursive case. When you are in the recursive case need to invokeAll(split1, split2)
Once we’ve created a Task we create a ForkJoinPool which will use the number of processors to determine how many threads to create
Fork/Join issues
compute methods must be protected!
Calling computer in a compute method causes thread to wait for results of subtask
e.g. fork() –> computer() –> join()
Threading problemes
Liveness issues: ability of app to execute in a timely manner
Deadlock: 2 or more threads are blocked forever each waiting on the other
Starvation: A thread is denied access to a shared resource or lock
Livelock:
Two or more threads are activitly trying to get locks but are unable to do so, so restart a process
Race condition:
Two tasks meant to be completed sequentually are performed at the same time.