Stream Building Flashcards
Create a SERDE for a class WindTurbine
ObjectMapperSerde turbineSerde = new ObjectMapperSerde<>(WindTurbine.class);
read the “turbines” topic as a KTable
builder.table(
“turbines”,
Consumed.with(Serdes.Integer(), turbineSerde),
Materialized.[Integer, WindTurbine, KeyValueStore[Bytes, byte[]]as(“turbine-store”)
.withKeySerde(Serdes.Integer())
.withValueSerde(turbineSerde)
);
read the “turbine-generated-watts” topic as a KStream
KStream wattsValuesStream = builder.stream(
“turbine-generated-watts”,
Consumed.with(Serdes.Integer(), Serdes.Integer())
);
map the watts stream into a new mwatts stream
wattsValuesStream.map((id, watts) -> {
return KeyValue.pair(id, MWatts(turbineId, watts / 1000000));
}).to(
“turbine-generated-mwatts”,
Produced.with(Serdes.Integer(), mwattsSerde)
);
count measurements by turbine in stream wattsValuesStream and write results to a new stream
wattsValuesStream
.groupByKey()
.count()
.toStream()
.map((turbineId, count) -> {
WindTurbineStats stats = new WindTurbineStats(turbineId, count);
return KeyValue.pair(turbineId, stats);
})
.to(
“turbine-stats”,
Produced.with(Serdes.Integer(), statsSerde)
);
Kafka properties for running a stream
quarkus. kafka-streams.application-id
quarkus. kafka-streams.bootstrap-servers
quarkus. kafka-streams.topics
Filter stream and write all events with amount greater than 100
stream
.filter((key, amount) -> amount > 1000)
.to(“large-payments”, Produced.with(keySerde, valueSerde));
Convert a stream of distances in meters to cm
stream
.map((key, value) -> {
Double cm = value * 0.1;
return KeyValue.pair(key, cm);
})
.to(“vehicle-feet-elevations”, Produced.with(Serdes.Integer(), Serdes.Double()));
Stream has vehicle positions, Keys are the color of the vehicle. group these vehicles by their Id
KGroupedStream positionsByVehicle = stream
.groupBy(
(key, value) -> value.vehicleId,
Grouped.with(Serdes.Integer(),vehiclePositionSerde)
);
what is the result of the count() operation and how to you convert it to a stream
KTable, call toStream()
Group users by country in a stream where the key is the language
userStreams.groupBy((key, value) -> value.country).
Country users per country in a stream where the key is the language
userStreams.groupBy((key, value) -> value.country).count();
Read from topic keyed by language and write to topic keyed by locationId to achieve repartitioning
stream.map(
(key, measure) ->new KeyValue<>(measure.locationId, measure);
).to(REPARTITIONED_TOPIC,
Produced.with(serde1, serd2)
);
Skeleton of a Stream processor in plain Java