Heap/Priority Queue Flashcards
Kth Largest Element in a Stream
You are part of a university admissions office and need to keep track of the kth highest test score from applicants in real-time. This helps to determine cut-off marks for interviews and admissions dynamically as new applicants submit their scores.
You are tasked to implement a class which, for a given integer k, maintains a stream of test scores and continuously returns the kth highest test score after a new score has been submitted. More specifically, we are looking for the kth highest score in the sorted list of all scores.
Implement the KthLargest class:
KthLargest(int k, int[] nums) Initializes the object with the integer k and the stream of test scores nums.
int add(int val) Adds a new test score val to the stream and returns the element representing the kth largest element in the pool of test scores so far.
Example 1:
Input:
[“KthLargest”, “add”, “add”, “add”, “add”, “add”]
[[3, [4, 5, 8, 2]], [3], [5], [10], [9], [4]]
Output: [null, 4, 5, 5, 8, 8]
Explanation:
KthLargest kthLargest = new KthLargest(3, [4, 5, 8, 2]);
kthLargest.add(3); // return 4
kthLargest.add(5); // return 5
kthLargest.add(10); // return 5
kthLargest.add(9); // return 8
kthLargest.add(4); // return 8
’’’
import heapq
class KthLargest:
def \_\_init\_\_(self, k: int, nums: List[int]): self.k = k self.min_heap = [] for num in nums: heapq.heappush(self.min_heap,num) def add(self, val: int) -> int: heapq.heappush(self.min_heap, val) while len(self.min_heap) > self.k: heapq.heappop(self.min_heap) return self.min_heap[0] ''' 1. We initialize self objects of k and min_heap. And push all elements provided into the min_heap 2. In add - we first push the element . As we want to get the Kth largest we pop the elements from min_heap till the size is equal to k. 3. Then return the element on top
Last Stone Weight
You are given an array of integers stones where stones[i] is the weight of the ith stone.
We are playing a game with the stones. On each turn, we choose the heaviest two stones and smash them together. Suppose the heaviest two stones have weights x and y with x <= y. The result of this smash is:
If x == y, both stones are destroyed, and
If x != y, the stone of weight x is destroyed, and the stone of weight y has new weight y - x.
At the end of the game, there is at most one stone left.
Return the weight of the last remaining stone. If there are no stones left, return 0.
Example 1:
Input: stones = [2,7,4,1,8,1]
Output: 1
Explanation:
We combine 7 and 8 to get 1 so the array converts to [2,4,1,1,1] then,
we combine 2 and 4 to get 2 so the array converts to [2,1,1,1] then,
we combine 2 and 1 to get 1 so the array converts to [1,1,1] then,
we combine 1 and 1 to get 0 so the array converts to [1] then that’s the value of the last stone.
’’’
import heapq
class Maxheap:
def __init__(self):
self.heap = []
def push(self,val):
heapq.heappush(self.heap,-val)
def pop(self):
return -heapq.heappop(self.heap)
def len(self):
return len(self.heap)
def top(self):
return -self.heap[0]
class Solution:
def lastStoneWeight(self, stones: List[int]) -> int:
max_heap = Maxheap()
for stone in stones:
max_heap.push(stone)
while max_heap.len() >1: stone1 = max_heap.pop() stone2 = max_heap.pop() diff = abs(stone1 - stone2) if diff !=0: max_heap.push(diff) if max_heap.len() == 0: return 0 else: return max_heap.top() '''
Logic - Implement max heap. Get the 2 largest stones , take the absolute diff of stone weight.
If diff != 0, then push diff back in max_heap. as that it is the residual stone weight.
HOW TO IMPLEMENT MAX HEAP:
Implement a class max_heap, that uses min_heap using negative of the numbers.
We cannot use min heap because we dont have option of popright(), to pop from reverse
Kth largest
Kth smallest
Kth largest - min heap of size k
Kth smallest - max heap of size k
K Closest Points to Origin
Solved
Medium
Topics
Companies
Given an array of points where points[i] = [xi, yi] represents a point on the X-Y plane and an integer k, return the k closest points to the origin (0, 0).
The distance between two points on the X-Y plane is the Euclidean distance (i.e., √(x1 - x2)2 + (y1 - y2)2).
Example 1:
Input: points = [[1,3],[-2,2]], k = 1
Output: [[-2,2]]
Explanation:
The distance between (1, 3) and the origin is sqrt(10).
The distance between (-2, 2) and the origin is sqrt(8).
Since sqrt(8) < sqrt(10), (-2, 2) is closer to the origin.
We only want the closest k = 1 points from the origin, so the answer is just [[-2,2]].
’’’
import heapq
import math
class Maxheap():
def __init__(self):
self.heap = []
def push(self,dis,x,y):
heapq.heappush(self.heap,(-dis,x,y))
def pop(self):
return heapq.heappop(self.heap)
def len(self):
return len(self.heap)
class Solution:
def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:
max_heap = Maxheap()
for coord in points:
x = coord[0]
y = coord[1]
dis = math.sqrt((x2)+(y2))
max_heap.push(dis,x,y)
while max_heap.len() > k: max_heap.pop() op = [] while max_heap.len() != 0: ele = max_heap.pop() op.append([ele[1],ele[2]]) return op ''' AS we want Kth smallest - We use max_heap. We go through the coordinates of point list and check for distance of the coordinates w.r.t (0,0). We then push (dis, x,y) ** Heap arranges based on 1st element, so x,y are dead weights, the order is being dictated by dis.
Then pop the max_heap to have length == k.
Then pop the elements from the heap and store the coordinates only.
Task Scheduler
Solved
Medium
Topics
Companies
Hint
You are given an array of CPU tasks, each labeled with a letter from A to Z, and a number n. Each CPU interval can be idle or allow the completion of one task. Tasks can be completed in any order, but there’s a constraint: there has to be a gap of at least n intervals between two tasks with the same label.
Return the minimum number of CPU intervals required to complete all tasks.
Example 1:
Input: tasks = [“A”,”A”,”A”,”B”,”B”,”B”], n = 2
Output: 8
Explanation: A possible sequence is: A -> B -> idle -> A -> B -> idle -> A -> B.
After completing task A, you must wait two intervals before doing A again. The same applies to task B. In the 3rd interval, neither A nor B can be done, so you idle. By the 4th interval, you can do A again as 2 intervals have passed.
Example 2:
Input: tasks = [“A”,”C”,”A”,”B”,”D”,”B”], n = 1
Output: 6
Explanation: A possible sequence is: A -> B -> C -> D -> A -> B.
With a cooling interval of 1, you can repeat a task after just one other task.
’’’
import heapq
from collections import deque, Counter
class MaxHeap:
def __init__(self):
self.heap = []
def push(self,val):
heapq.heappush(self.heap, -val)
def pop(self):
return -heapq.heappop(self.heap)
def len(self):
return len(self.heap)
class Solution:
def leastInterval(self, tasks: List[str], n: int) -> int:
# get freq of types of job
task_counter = Counter(tasks)
max_heap = MaxHeap() ## to keep track of most freq event
for val in task_counter.values():
max_heap.push(val)
cooldown_queue = deque() # to keep track of (time_to_reenter, task_count)
time = 0
while max_heap.len()>0 or len(cooldown_queue)>0:
time+=1
if max_heap.len() > 0:
task_count = max_heap.pop() - 1
if task_count != 0:
cooldown_queue.append((time+n, task_count))
if cooldown_queue and cooldown_queue[0][0] == time:
time_to_reenter, task_count = cooldown_queue.popleft()
max_heap.push(task_count)
return time
‘’’
1. Get counter of frequency of event types.
Logic being, at any given time we want to deal with the event with highest frequency
2. We use a max_heap to get the event type with highest frequency at any given time.
3. We use max_heap to choose event with highest task_count, then we reduce it by as time passes.
4. Then if task_count != 0 then we add it to a queue (time_to_reenter, task_count).
time_to_reenter = time + n , n is idle time .
5. if top element in queue, (FIFO) the time_to_reenter == time, then we popleft from queue and the task_count back to max_heap
Design Twitter
Solved
Medium
Topics
Companies
Design a simplified version of Twitter where users can post tweets, follow/unfollow another user, and is able to see the 10 most recent tweets in the user’s news feed.
Implement the Twitter class:
Twitter() Initializes your twitter object.
void postTweet(int userId, int tweetId) Composes a new tweet with ID tweetId by the user userId. Each call to this function will be made with a unique tweetId.
List<Integer> getNewsFeed(int userId) Retrieves the 10 most recent tweet IDs in the user's news feed. Each item in the news feed must be posted by users who the user followed or by the user themself. Tweets must be ordered from most recent to least recent.
void follow(int followerId, int followeeId) The user with ID followerId started following the user with ID followeeId.
void unfollow(int followerId, int followeeId) The user with ID followerId started unfollowing the user with ID followeeId.</Integer>
Example 1:
Input
[“Twitter”, “postTweet”, “getNewsFeed”, “follow”, “postTweet”, “getNewsFeed”, “unfollow”, “getNewsFeed”]
[[], [1, 5], [1], [1, 2], [2, 6], [1], [1, 2], [1]]
Output
[null, null, [5], null, null, [6, 5], null, [5]]
Explanation
Twitter twitter = new Twitter();
twitter.postTweet(1, 5); // User 1 posts a new tweet (id = 5).
twitter.getNewsFeed(1); // User 1’s news feed should return a list with 1 tweet id -> [5]. return [5]
twitter.follow(1, 2); // User 1 follows user 2.
twitter.postTweet(2, 6); // User 2 posts a new tweet (id = 6).
twitter.getNewsFeed(1); // User 1’s news feed should return a list with 2 tweet ids -> [6, 5]. Tweet id 6 should precede tweet id 5 because it is posted after tweet id 5.
twitter.unfollow(1, 2); // User 1 unfollows user 2.
twitter.getNewsFeed(1); // User 1’s news feed should return a list with 1 tweet id -> [5], since user 1 is no longer following user 2.
’’’
from collections import deque
class Twitter:
def \_\_init\_\_(self): self.followees = defaultdict(set) #tracks the people the user follows self.tweets = defaultdict(deque)#tracks (timestamp, tweetID) self.timestamp = 0#we keep track of time only when a post is made, because we want to #get n latest tweets def postTweet(self, userId: int, tweetId: int) -> None: if len(self.tweets[userId]) == 10: self.tweets[userId].popleft() # remove the oldest tweet self.tweets[userId].append((self.timestamp, tweetId)) self.timestamp+=1 def getNewsFeed(self, userId: int) -> List[int]: #top 10 of (tweets from user + tweets of the people user follows) tweets_to_consider = list(self.tweets[userId]) for followee in self.followees[userId]: tweets_to_consider.extend(self.tweets[followee]) top_tweets = heapq.nlargest(10, tweets_to_consider, key=lambda x:x[0]) tt = [] for timestamp, tweetid in top_tweets: tt.append(tweetid) return tt def follow(self, followerId: int, followeeId: int) -> None: if followerId != followeeId: self.followees[followerId].add(followeeId) def unfollow(self, followerId: int, followeeId: int) -> None: if followeeId in self.followees[followerId]: self.followees[followerId].remove(followeeId)
’’’
1. defaultdict(set) of followees - people that user follows.
2. defaultditct(deque) of tweets - stores (timestamp,tweetID)
Find Median from Data Stream
The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value, and the median is the mean of the two middle values.
For example, for arr = [2,3,4], the median is 3.
For example, for arr = [2,3], the median is (2 + 3) / 2 = 2.5.
Implement the MedianFinder class:
MedianFinder() initializes the MedianFinder object.
void addNum(int num) adds the integer num from the data stream to the data structure.
double findMedian() returns the median of all elements so far. Answers within 10-5 of the actual answer will be accepted.
’’’
import heapq
class MaxHeap:
def __init__(self):
self.heap = []
def push(self,val):
heapq.heappush(self.heap,-val)
def pop(self):
return -heapq.heappop(self.heap)
def top(self):
return -self.heap[0]
def __len__(self):
return len(self.heap)
class MedianFinder:
def \_\_init\_\_(self): self.min_heap = [] self.max_heap = MaxHeap() def addNum(self, num: int) -> None: self.max_heap.push(num) if len(self.max_heap) > 0 and len(self.min_heap)>0 and self.max_heap.top() > self.min_heap[0]: heapq.heappush(self.min_heap, self.max_heap.pop()) #balancing if len(self.max_heap) > len(self.min_heap) + 1: heapq.heappush(self.min_heap, self.max_heap.pop()) elif len(self.min_heap) > len(self.max_heap) : self.max_heap.push(heapq.heappop(self.min_heap)) def findMedian(self) -> float: if len(self.max_heap) > len(self.min_heap): return self.max_heap.top() elif len(self.max_heap) == len(self.min_heap): return (self.max_heap.top() + self.min_heap[0])/2
’’’
The Naive approach to this problem is to have a list and as an when a number is add, sort it and get the middle elements AKA media. - NlogN
Instead imagine the numbers as 3 parts - left , middle (consists of 1 or 2 numbers), right.
middle is the median.
Left - should be sorted and numbers less than middle are present
right - should be sorted and numbers greater than middle are present
left & right - should of pretty much same size, atmost either one can 1 more than the other.
With these intuition -
Max heap for left - keeps the left part sorted, quickly exposes the largest of small numbers. That is, immediate number to middle
Min heap for right - keeps the right part sorted, quickly exposes the smallest of large numbers. That is, immediate number to middle
The number entering the stream. if:
1. smaller than max heap top value - then goes to max heap
2. larger than min heap top value - then goes to min heap
This comparision is done like this in code:
push all elements in max_heap and check if max heap is not empty and if min heap is not empty and max_heap_top > min_heap_top. Then the top max heap element goes to min heap
Balancing - we need to keep the length of both at max diff of 1:
if either of heap is longer than the other by 2 or more, then top of that heap is popped and added to the other heap.
finding median - At any moment we check :
if the length of max heap > length of min heap : then median is the top of max_heap
if they are equal - then we average the top of both heaps
** min heap can never be longer than max_heap , it is because of the way we add new number. ANy new number comes into max heap and MAY go to min heap.
WHY DO WE CHECK IF len of min heap > len of max heap in addNum():
Example Walkthrough of Why This Condition is Necessary
Suppose we add the following numbers in sequence: [10, 20, 30, 40].
Step-by-Step:
Add 10:
We add 10 to max_heap as -10 to simulate max behavior.
Heaps:
max_heap = [-10]
min_heap = []
No need to rebalance, as max_heap has one extra element.
Median: 10.
Add 20:
Add 20 to max_heap as -20.
Since -20 (representing 20) is greater than -10 (representing 10), we move the root of max_heap (which is 10) to min_heap to maintain the balance.
Heaps after balancing:
max_heap = [-10]
min_heap = [20]
Now both heaps are of equal size.
Median: (10 + 20) / 2 = 15.
Add 30:
Add 30 to max_heap as -30.
Since 30 is larger than 10, it should go to min_heap.
We now have an imbalance: min_heap has more elements than max_heap.
Heaps before rebalance:
max_heap = [-10]
min_heap = [20, 30]
Rebalance Step: Move the smallest element from min_heap (20) back to max_heap.
Heaps after rebalance:
max_heap = [-20, -10]
min_heap = [30]
Median: 20 (root of max_heap since it has more elements).
Add 40:
Add 40 to max_heap as -40.
Since 40 is greater than 20, it should go to min_heap.
Heaps:
max_heap = [-20, -10]
min_heap = [30, 40]
Both heaps now have equal sizes again.
Median: (20 + 30) / 2 = 25
Merge k Sorted Lists
You are given an array of k linked-lists lists, each linked-list is sorted in ascending order.
Merge all the linked-lists into one sorted linked-list and return it.
Example 1:
Input: lists = [[1,4,5],[1,3,4],[2,6]]
Output: [1,1,2,3,4,4,5,6]
Explanation: The linked-lists are:
[
1->4->5,
1->3->4,
2->6
]
merging them into one sorted list:
1->1->2->3->4->4->5->6
’’’
‘’’
HEAP METHOD:
we want to sort in ascending order so wen use min heap
we go through the ‘lists’ and stores (lists[i].val, i , lists[i]) into min_heap that translates to
(value of node, index(which linked list it belongs to), head of the LL).
index - is used as tie breaker, as min heap the sorting is primarily on the 1st value - that is lists[i].val
We create a dummy node , assign a variable current to the dummy node.
While min_heap:
we pop the val, index, node from min_heap.
we make current.next = node
iterate current by current = current.next
if node.next exists then we add (node.next.val , index, node.next)
##we use same index,to signify this node belongs to that LL, helps in tiebreak
return dummy.next