Spring Kafka Flashcards
What is NewTopic
, what is its purpose, and what is it used by?
NewTopic
is an Apache class and its purpose is to represent a new topic in the Kafka cluster. NewTopic
is used by AdminClient
to create a new topic if it does not already exist in the Kafka cluster. In addition, if NewTopic
is declared as a Spring bean, KafkaAdmin
(wrapper for AdminClient
) will use it to create a new topic in the Kafka cluster if it does not already exist.
For example:
@Bean public NewTopic topic() { return TopicBuilder.name("topic1") .partitions(10) .replicas(1) .build(); }
What is KafkaAdmin
and what is its primary purpose?
KafkaAdmin
is a bean automatically created by Spring Boot and its primary purpose is to provide a way to manage topics by delegating to Apache’s AdminClient
What is KafkaTemplate<K, V>
and what is its primary purpose?
KafkaTemplate<K, V>
is a bean automatically created by Spring Boot and its primary purpose is to provide a class for sending messages to a topic. The template delegates to an Apache KafkaProducer<K, V>
How do the following beans: producerFactory, producerConfigs, and kafkaTemplate relate to one another?
producerFactory uses producerConfigs to configure the factory. kafkaTemplate uses producerFactory to create a producer. For example:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); }
What is a ProducerRecord<Key, Value>
?
A ProducerRecord<Key, Value>
is a Kafka class that represents the message to be sent to the Kafka cluster
When configuring a topic, what is the difference between CREATE_TIME
and LOG_APPEND_TIME
?
CREATE_TIME
will use the timestamp provided by the Producer. LOG_APPEND_TIME
will use the timestamp provided by the Broker when it appends the message to its log
What is the default ProducerListener
that a KafkaTemplate<K, V>
uses and what is its behavior?
LoggingProducerListener
, it logs errors when a send is unsuccessful and does nothing when a send is successful. You can configure your own ProducerListener
.
For example:
template.setProducerListener(...);
What is the purpose of a ProducerListener
?
The purpose of a ProducerListener
is to execute an asynchronous callback with the results of a send (success or failure) instead of waiting for a CompletableFutuer<T>
to complete. A KafkaTemplate<K, V>
is automatically configured with a LoggingProducerListener
True or False
By default, the DefaultKafkaProducerFactory
creates a singleton producer used by all clients, as recommended in the KafkaProducer
JavaDocs
True. However, should you wish to use a separate producer per thread, set producerPerThread
on the DefaultKafkaProducerFactory
to true. Care must be taken when setting this property to true.
See: https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html#producer-factory
What is a message listener container and what does it use to listen to messages?
A message listener container is a Spring concept (interface). Essentially, a message listener container is responsible for setting up all of the infrastructure needed to receive messages from a message provider (e.g. Kafka). A container uses message listeners to receive messages.
What are the 2 implementations of MessageListenerContainer
and what are their differences?
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
is a single-threaded message listener container that uses a KafkaConsumer<K, V>
to consume messages. ConcurrentMessageListenerContainer
creates 1 or more KafkaMessageListenerContainer
s based on concurrency
What is the default offset commit AckMode
for a KafkaMessageListenerContainer
?
Batch
. This commits the offset when all the records returned by poll()
have been processed. Remember, a consumer’s poll()
returns one or more ConsumerRecord
s. The consumer will block until and acknowledgement is returned by the broker (e.g. syncCommits=true
)
Review the following code:
@KafkaListener(id = "one", topics = "one") public void listen1(String in) { System.out.println("1: " + in); } @KafkaListener(id = "two", topics = "two", properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer") public void listen2(byte[] in) { System.out.println("2: " + new String(in)); }
What bean is responsible for creating a message listener container when using @KafkaListener
?
KafkaListenerEndpointRegistry
is automatically created by Spring and is responsible for creating and managing message listener containers when @KafkaListener
is used
Review the following code:
@Autowired private KafkaListenerEndpointRegistry registry; ... this.registry.getListenerContainer("myContainer").start(); ...