Stream Building Flashcards

1
Q

Create a SERDE for a class WindTurbine

A

ObjectMapperSerde turbineSerde = new ObjectMapperSerde<>(WindTurbine.class);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

read the “turbines” topic as a KTable

A

builder.table(
“turbines”,

Consumed.with(Serdes.Integer(), turbineSerde),

Materialized.[Integer, WindTurbine, KeyValueStore[Bytes, byte[]]as(“turbine-store”)

.withKeySerde(Serdes.Integer())

.withValueSerde(turbineSerde)
);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

read the “turbine-generated-watts” topic as a KStream

A

KStream wattsValuesStream = builder.stream(
“turbine-generated-watts”,
Consumed.with(Serdes.Integer(), Serdes.Integer())
);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

map the watts stream into a new mwatts stream

A

wattsValuesStream.map((id, watts) -> {

return KeyValue.pair(id, MWatts(turbineId, watts / 1000000));
}).to(

“turbine-generated-mwatts”,
Produced.with(Serdes.Integer(), mwattsSerde)
);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

count measurements by turbine in stream wattsValuesStream and write results to a new stream

A

wattsValuesStream
.groupByKey()
.count()
.toStream()
.map((turbineId, count) -> {
WindTurbineStats stats = new WindTurbineStats(turbineId, count);
return KeyValue.pair(turbineId, stats);
})
.to(
“turbine-stats”,
Produced.with(Serdes.Integer(), statsSerde)
);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Kafka properties for running a stream

A

quarkus. kafka-streams.application-id
quarkus. kafka-streams.bootstrap-servers
quarkus. kafka-streams.topics

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Filter stream and write all events with amount greater than 100

A

stream
.filter((key, amount) -> amount > 1000)
.to(“large-payments”, Produced.with(keySerde, valueSerde));

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Convert a stream of distances in meters to cm

A

stream
.map((key, value) -> {
Double cm = value * 0.1;
return KeyValue.pair(key, cm);
})
.to(“vehicle-feet-elevations”, Produced.with(Serdes.Integer(), Serdes.Double()));

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Stream has vehicle positions, Keys are the color of the vehicle. group these vehicles by their Id

A

KGroupedStream positionsByVehicle = stream
.groupBy(
(key, value) -> value.vehicleId,
Grouped.with(Serdes.Integer(),vehiclePositionSerde)
);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

what is the result of the count() operation and how to you convert it to a stream

A

KTable, call toStream()

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Group users by country in a stream where the key is the language

A

userStreams.groupBy((key, value) -> value.country).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Country users per country in a stream where the key is the language

A

userStreams.groupBy((key, value) -> value.country).count();

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Read from topic keyed by language and write to topic keyed by locationId to achieve repartitioning

A

stream.map(
(key, measure) ->new KeyValue<>(measure.locationId, measure);

).to(REPARTITIONED_TOPIC,
Produced.with(serde1, serd2)
);

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Skeleton of a Stream processor in plain Java

A
How well did you know this?
1
Not at all
2
3
4
5
Perfectly