Troubleshooting Flashcards
Initialize transaction
producer = new KafkaProducer<>(conf);
producer.initTransactions();
Start transaction
producer.beginTransaction();
Wrap up transaction
producer. sendOffsetsToTransaction(getConsumedOffsets(consumer), consumer.groupMetadata());
producer. commitTransaction();
wait for async calculations to complete
Print Stream in one instruction
stream.print(Printed.toSysOut());
Consume events sent to topic. Create a tumbling window with a size of 10 seconds without a grace period. Count the records, and print the KTable to the standard output.
Configure timeouts
props. put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
props. put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 150);
Configure idempotence
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
Configure retries
props.put(ProducerConfig.RETRIES_CONFIG, 0);
Initiate topology test driver
testDriver = new TopologyTestDriver(tracker.buildTopology());
Create input test topic
vehicleRegisteredTopic = testDriver.createInputTopic(topic, serializer1, serializer2);
Create test output topic
vehicleStatusTopic = testDriver.createOutputTopic(topic,deserializer1,deserializer2);
Create test store
vehiclesStore = testDriver.getKeyValueStore(“vehicles-store”);
Wrap up test with topology test driver
@AfterEach
public void tearDown() {
testDriver.close();
}
test that VehicleRegistered events create vehicles in the store