Questions Marked For Review Flashcards
In Kafka Streams, what value are internal topics prefixed by?
- kafka-streams-
- group.id
- tasks-
- application.id
Answer: application.id
In Kafka Streams, the application.id is also the underlying group.id for your consumers, and the prefix for all internal topics (repartition and state).
A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from 45 to 2311. The consumer group has committed the offset 10 for the topic before. Where will the consumer read from?
- offset 45
- offset 2311
- offset 10
- it will crash
Answer: it will crash.
Explanation:
auto.offset.reset=none means that the consumer will crash if the offsets that is recovering from have been deleted from Kafka, which is the case here since 10 is less than 45.
This is because the offset 10 has already been committed and the topic partition has data for offsets from 45 to 2311.
What is returned by a producer.send() call in the Java API?
A Future object
You are using JDBC source connector to copy data from 3 tables to 3 Kafka topics. There is one connector created with max.tasks equal to 2 deployed on a cluster of 3 workers. How many tasks are launched?
Answer: 2
Here, we have 3 tables but max.tasks=2 is set. So, this means that the max number of tasks that will be created is 2.
In Java, Avro SpecificRecords classes are:
- Automatically generated from an Avro Schema
- Automatically generated from an Avro Schema + Maven / Gradle Plugin
- Written manually by the programmer
Answer: Automatically generated from an Avro Schema + Maven/Gradle Plugin.
A SpecificRecord is created from generated record classes.
Consider the following code block:
While (true) { ConsumerRecords records = consumer.poll(100); try { consumer.commitSync(); } catch (CommitFailedException e) { log.error(“commit failed”, e) }
for (ConsumerRecord record : records) { System.out.printf(“topic = %s, partition = %s, offset = %d, customer = %s”, record.topic(), record.partition(), record.offset(), record.key(), record.value() ); } } ~~~ ```
What kind of deliver guarantee does this consumer offer?
Answer: At-most-once
Here the offset is committed before processing the message. If the consumer crashes before processing the message, the message will be lost when the consumer comes back up.
What data format is NOT natively available with the Confluent REST Proxy?
- Avro
- Binary
- JSON
- Protobuf
Answer: Protobuf.
Explanation:
Protocol buffers are not a natively supported type for the Confluent REST Proxy, but you may use the binary format instead.
Compaction is enabled for a topic in Kafka by setting log.cleanup.policy=compact. What is true about log compaction?
- After cleanup, only one message per key is retained with the latest value.
- Compaction changes the offset of messages.
- After cleanup, only one message per key is retained with the first value.
- Each message stored in the topic is compressed.
- Kafka automatically de-duplicates incoming messages based on key hashes.
- After cleanup, only one message per key is retained with the latest value.
Log compaction retains at least the last known value for each record key for a single topic partition. All compacted log offsets remain valid, even if a record at an offset has been compacted away, as a consumer will get the next highest offset.
You are sending messages with keys to a topic. To increase throughput, you decide to increase the number of partitions of the topic. Select all that apply.
- New records may get written to a different partition.
- New records with the same key will get written to the partition where old records with that key were written.
- All existing records will get rebalanced among the partitions to balance the load.
- Old records will stay in their partitions.
- New records may get written to a different partition. And…
- Old records will stay in their partitions.
Explanation:
Increasing the number of partitions causes new messages to get hashed differently, and breaks the guarantee that the “same key goes to the same partition.”
Kafka logs are immutable and the previous messages are not re-shuffled or reordered.
You are building a consumer application that processes events from a Kafka topic. What is the most important metric to monitor to ensure real-time processing?
- MessagesInPerSec
- UnderReplicatedPartitions
- records-lag-max
- BytesInPerSec
Answer: records-lag-max
This metric shows the current lag, which is the number of messages the consumer is behind the broker by.
How can you gracefully make a Kafka consumer immediately stop polling data from Kafka, and gracefully shut down a consumer application?
- Call consumer.poll() in another thread.
- Call consumer.wakeUp() and catch a WakeUpException.
- Kill the consumer thread.
- Call consumer.wakeUp() and catch a WakeUpException.
A consumer wants to read messages from partitions 0 and 1 of a topic named “topic1”. What will be the result of the following code snippet:
Consumer.subscribe(Arrays.asList(“topic1”)); List pc = new ArrayList<>(); Pc.add(new PartitionTopic(“topic1, 0)); Pc.add(new PartitionTopic(“topic1”, 1)); Consumer.assign(pc);
Answer: The code will throw “IllegalStateException”
Explanation: The subscribe() and assign() methods cannot be called by the same consumer - subscribe() is used to leverage the consumer group mechanism, while assign() is used to manually control partition assignment and reads assignment.
A topic has 3 replicas and you set min.insync.replicas=2. If 2 out of 3 replicas are not available, what happens when a consume request is sent to the broker?
- A new leader for partition will be elected
- NotEnoughReplicasException will be returned
- Data will be returned from the remaining in-sync replica
- An empty message will be returned
- Data will be returned from the remaining in-sync replica.
With this configuration, a single in-sync replica is still readable, but not writable if the producer is using acks=all.
StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(“word-count-input”); KTable wordCounts = textLines .mapValues(textLine -> textLine.toLowerCase()) .flatMapValues(textLine -> Arrays.asList(textLine.split(“\W+”))) .selectKey((key, word) -> word) .groupByKey() .count(Materialized.as(“Counts”)); wordCounts.toStream() .to(“word-count-output”, Produced.with(Serdes.String(), Serdes.long())); builder.build();
What is an adequate topic configuration for the topic word-count-output?
- cleanup.policy=compact
- max.messages.bytes=100000000
- compression.type=lz4
- cleanup.policy=delete
- cleanup.policy=compact
Result is aggregated into a table with key as the unique word and value equal to its frequency.
We have to enable log compaction for this topic to align the topic’s cleanup policy with KTable semantics.
What Java library is KSQL based on?
- Kafka Connect
- Kafka Streams
- Schema Registry
- REST Proxy
Answer: Kafka Streams
KSQL is based on Kafka Streams and allows you to express transformations in the SQL language that get automatically converted to a Kafka Streams program in the backend.