Questions Marked For Review Flashcards
In Kafka Streams, what value are internal topics prefixed by?
- kafka-streams-
- group.id
- tasks-
- application.id
Answer: application.id
In Kafka Streams, the application.id is also the underlying group.id for your consumers, and the prefix for all internal topics (repartition and state).
A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from 45 to 2311. The consumer group has committed the offset 10 for the topic before. Where will the consumer read from?
- offset 45
- offset 2311
- offset 10
- it will crash
Answer: it will crash.
Explanation:
auto.offset.reset=none means that the consumer will crash if the offsets that is recovering from have been deleted from Kafka, which is the case here since 10 is less than 45.
This is because the offset 10 has already been committed and the topic partition has data for offsets from 45 to 2311.
What is returned by a producer.send() call in the Java API?
A Future object
You are using JDBC source connector to copy data from 3 tables to 3 Kafka topics. There is one connector created with max.tasks equal to 2 deployed on a cluster of 3 workers. How many tasks are launched?
Answer: 2
Here, we have 3 tables but max.tasks=2 is set. So, this means that the max number of tasks that will be created is 2.
In Java, Avro SpecificRecords classes are:
- Automatically generated from an Avro Schema
- Automatically generated from an Avro Schema + Maven / Gradle Plugin
- Written manually by the programmer
Answer: Automatically generated from an Avro Schema + Maven/Gradle Plugin.
A SpecificRecord is created from generated record classes.
Consider the following code block:
While (true) { ConsumerRecords records = consumer.poll(100); try { consumer.commitSync(); } catch (CommitFailedException e) { log.error(“commit failed”, e) }
for (ConsumerRecord record : records) { System.out.printf(“topic = %s, partition = %s, offset = %d, customer = %s”, record.topic(), record.partition(), record.offset(), record.key(), record.value() ); } }
~~~
```
What kind of deliver guarantee does this consumer offer?
Answer: At-most-once
Here the offset is committed before processing the message. If the consumer crashes before processing the message, the message will be lost when the consumer comes back up.
What data format is NOT natively available with the Confluent REST Proxy?
- Avro
- Binary
- JSON
- Protobuf
Answer: Protobuf.
Explanation:
Protocol buffers are not a natively supported type for the Confluent REST Proxy, but you may use the binary format instead.
Compaction is enabled for a topic in Kafka by setting log.cleanup.policy=compact. What is true about log compaction?
- After cleanup, only one message per key is retained with the latest value.
- Compaction changes the offset of messages.
- After cleanup, only one message per key is retained with the first value.
- Each message stored in the topic is compressed.
- Kafka automatically de-duplicates incoming messages based on key hashes.
- After cleanup, only one message per key is retained with the latest value.
Log compaction retains at least the last known value for each record key for a single topic partition. All compacted log offsets remain valid, even if a record at an offset has been compacted away, as a consumer will get the next highest offset.
You are sending messages with keys to a topic. To increase throughput, you decide to increase the number of partitions of the topic. Select all that apply.
- New records may get written to a different partition.
- New records with the same key will get written to the partition where old records with that key were written.
- All existing records will get rebalanced among the partitions to balance the load.
- Old records will stay in their partitions.
- New records may get written to a different partition. And…
- Old records will stay in their partitions.
Explanation:
Increasing the number of partitions causes new messages to get hashed differently, and breaks the guarantee that the “same key goes to the same partition.”
Kafka logs are immutable and the previous messages are not re-shuffled or reordered.
You are building a consumer application that processes events from a Kafka topic. What is the most important metric to monitor to ensure real-time processing?
- MessagesInPerSec
- UnderReplicatedPartitions
- records-lag-max
- BytesInPerSec
Answer: records-lag-max
This metric shows the current lag, which is the number of messages the consumer is behind the broker by.
How can you gracefully make a Kafka consumer immediately stop polling data from Kafka, and gracefully shut down a consumer application?
- Call consumer.poll() in another thread.
- Call consumer.wakeUp() and catch a WakeUpException.
- Kill the consumer thread.
- Call consumer.wakeUp() and catch a WakeUpException.
A consumer wants to read messages from partitions 0 and 1 of a topic named “topic1”. What will be the result of the following code snippet:
Consumer.subscribe(Arrays.asList(“topic1”)); List pc = new ArrayList<>(); Pc.add(new PartitionTopic(“topic1, 0)); Pc.add(new PartitionTopic(“topic1”, 1)); Consumer.assign(pc);
Answer: The code will throw “IllegalStateException”
Explanation: The subscribe() and assign() methods cannot be called by the same consumer - subscribe() is used to leverage the consumer group mechanism, while assign() is used to manually control partition assignment and reads assignment.
A topic has 3 replicas and you set min.insync.replicas=2. If 2 out of 3 replicas are not available, what happens when a consume request is sent to the broker?
- A new leader for partition will be elected
- NotEnoughReplicasException will be returned
- Data will be returned from the remaining in-sync replica
- An empty message will be returned
- Data will be returned from the remaining in-sync replica.
With this configuration, a single in-sync replica is still readable, but not writable if the producer is using acks=all.
StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(“word-count-input”); KTable wordCounts = textLines .mapValues(textLine -> textLine.toLowerCase()) .flatMapValues(textLine -> Arrays.asList(textLine.split(“\W+”))) .selectKey((key, word) -> word) .groupByKey() .count(Materialized.as(“Counts”)); wordCounts.toStream() .to(“word-count-output”, Produced.with(Serdes.String(), Serdes.long())); builder.build();
What is an adequate topic configuration for the topic word-count-output?
- cleanup.policy=compact
- max.messages.bytes=100000000
- compression.type=lz4
- cleanup.policy=delete
- cleanup.policy=compact
Result is aggregated into a table with key as the unique word and value equal to its frequency.
We have to enable log compaction for this topic to align the topic’s cleanup policy with KTable semantics.
What Java library is KSQL based on?
- Kafka Connect
- Kafka Streams
- Schema Registry
- REST Proxy
Answer: Kafka Streams
KSQL is based on Kafka Streams and allows you to express transformations in the SQL language that get automatically converted to a Kafka Streams program in the backend.
A consumer sends a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never responds. Meanwhile, the consumer processed another batch and successfully committed offset 3000.
What should you do?
- Use the kafka-consumer-group command to manually commit the offsets 2000 for the consumer group.
- Nothing.
- Restart the consumer.
- Add a new consumer to the group.
Answer: Nothing.
In this case, because the offset 3000 has been committed and all the messages between 0 and 3000 have all been processed, it is okay not to have committed offset 2000. Therefore, you don’t have to do anything - this behavior is acceptable.
In Avro, removing a field that does not have a default is what kind of schema evolution?
Answer: Backward.
Clients with new schema will be able to read records saved with old schema.
When auto.create.topics.enable=true in Kafka configuration, what are the circumstances where a Kafka broker automatically creates a topic?
A Kafka broker automatically creates a topic under the following circumstances:
- When a producer starts writing messages to the topic. - When a consumer starts reading messages from the topic. - When any client requests metadata for the topic.
To allow consumers in a group to resume at the previously committed offset, I need to set the proper value for…
- group.id
- auto.offset.resets
- value.deserializer
- enable.auto.commit
Answer: group.id
Setting a group.id that is consistent across restarts will allow your consumers that are part of the same group to resume reading from where offsets were last committed for that group.
We have a store selling shoes. What dataset is a great candidate to be modeled as a KTable in Kafka Streams? Chose all that apply.
- The transaction stream
- Items returned
- Money made until now
- Inventory contents right now
- Money made until now
- Inventory contents right now
Aggregations of streams are stored in tables, whereas Streams must be modeled as a KStream to avoid data explosion.
When using plain JSON data with Connect, you see the following error message: org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields.
How will you fix the error?
- Set key.converter, value.converter to JsonConverter and the schema registry url
- Use Single Message Transformation to add schema and payload fields in the message.
- Set key.converter, value.converter to AvroConverter and the schema registry url
- Set key.converter.schemas.enable and value.converter.schemas.enable to false
- Set key.converter.schemas.enable and value.converter.schemas.enable to false
You will need to set the schemas.enable parameters for the converter to false for plain text with no schema.
When using the Confluent Kafka Distribution, where does the schema registry reside?
- As an in-memory plugin on your Kafka Brokers
- As a separate JVM component
- As an in-memory plugin on your Kafka Connect Workers
- As an in-memory plugin on your Zookeeper cluster
- As a separate JVM component
Schema registry is a separate application that provides a RESTful interface for storing and retrieving Avro schemas.
The kafka-console-consumer CLI, when used with the default options…
- uses a random group id
- always uses the same group id
- does not use a group id
- uses a random group id.
If a group is not specified, the kafka-console-consumer generates a random consumer group.
When is the onCompletion() method called in the following code:
Private class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } } ProducerRecord record = new ProducerRecord<>(“topic1”, “key1”, “value1”); producer.send(record, new ProducerCallback());
The callback is invoked when a broker response is received.