Chapter 6, Stream-Processing Patterns


What do event-driven architecture patterns focus on compared to stream-processing patterns?


Event-driven architecture patterns revolve around event delivery and orchestration, whereas stream-processing patterns focus on how such events can be processed on the fly to extract meaningful information and take actions in real time.
Page 330

How can messages be transformed in a system?


Messages can be transformed by using various techniques, such as via code with traditional programming languages and through specialized applications that perform data mapping. These applications include service buses and stream-processing systems that can run on the cloud, such as Apache Camel, KSQL, Amazon Kinesis, and Azure Stream Analytics.
Page 333

What is protocol switching and when is it necessary?


Protocol switching is needed when different teams use different, noncompatible message brokers. For example, one team might use Kafka for its message processing, while another uses Apache ActiveMQ. An intermediate application is used to consume events from AMQP, deserialize them, serialize them as Kafka events, and publish them to Kafka.
Page 335

Protocol switching: When working with partners and third-party teams, sometimes different teams will use different, noncompatible message brokers. One team might use Kafka for its message processing, while another uses Apache ActiveMQ, for instance. We cannot simply send events from one to another without some kind of conversion. Here, we use an intermediate application that consumes events from AMQP and deserializes them. Then it serializes those events as Kafka events and publishes them to Kafka.

Can protocol switching be implemented without data mapping?


Yes, protocol switching alone does not require data mapping, so it can be implemented via a simple cloud native application by using the appropriate protocol libraries for both event consumption and publishing.
Page 335

When is the Transformation Pattern especially useful?


The Transformation Pattern is especially useful when working with applications managed by partner teams, and transformations are needed to allow cloud native applications to interoperate.
Page 335

How can stateless transformations be scaled in cloud native applications?


Stateless transformations can be scaled horizontally without issues, and serverless compute options such as Amazon Lambda or Azure Functions can be used for these use cases.
Page 335

What pattern should be used for stateful transformations like calculating the average temperature over the last hour?


For stateful transformations, such as calculating the average temperature over the last hour, systems cannot be simply scaled horizontally. The Sequential Convoy pattern should be used to partition and scale these applications.
Page 335

How should events be filtered by category in an e-commerce platform?


Use subscription filters provided by message brokers to filter only the relevant type of data for processing. If that is not possible, implement an intermediate microservice or serverless function to filter and publish only the relevant events. This improves security and eliminates potential misuse of data.
Page 337

Why is it essential to filter only the most critical data based on a threshold in some scenarios?


It is essential to filter only the most critical data based on a threshold when processing everything at all times is not computationally feasible. For example, in a banking use case with hundreds of transactions performed every minute, performing human verification on all events to detect fraud is not possible.
Page 337

What benefits do the Filters and Thresholds Pattern provide to cloud native applications?


The Filters and Thresholds Pattern allows cloud native applications to extract relevant events for processing and reduces their load by dropping irrelevant or lower-priority events.
Page 338

How can modern message brokers like Kafka support the Filters and Thresholds Pattern?


Modern message brokers like Kafka natively support subscription to topics with a filter condition, allowing cloud native applications to avoid running additional containers just for filtering.
Page 338

How can filters be implemented and deployed in cloud native applications?


Filters can be implemented as stateless microservices and deployed in front of any other cloud native application to filter and pass only the relevant events. Serverless compute options such as Amazon Lambda and Azure Functions can also be used to implement the Filters and Thresholds Pattern.
Page 338

What are the types of windowed aggregation operations?


The types of windowed aggregation operations are length sliding, length batch, time sliding, and time batch. Aggregation operations are performed on top of these windows, and the aggregation output is emitted as a stream for further processing.
Page 339

What pattern should be applied to ensure accurate aggregation calculations during system downtime?


The Two-Node Failover pattern should be applied to ensure accurate aggregation calculations are continuously emitted for decision making during system downtime.
Page 344

When is it not necessary to use reliability patterns for aggregation services?


If the service is not critical for the business, system downtime may not cause a business impact. In this case, there is no need to worry about preserving the window state, and therefore no need to use reliability patterns.
Page 344

Aggregate events over length: Sometimes the number of events is an important aspect of the aggregation, and those cannot be modeled with time.
Why is the Windowed Aggregation Pattern considered stateful and what are the implications during system failures?


The Windowed Aggregation Pattern is stateful because windows rely on multiple events, and a system failure or restart can cause those events to get lost, resulting in inconsistent aggregation results. When aggregations are critical, reliability patterns are needed to rebuild or recover the state after a failure or restart.
Page 345

Why can’t all types of aggregations be implemented with high accuracy and efficiency?


Not all types of aggregations can be implemented with high accuracy and efficiency because some, like calculating the median, require iterating through all events in a window, adding latency and requiring more space. In contrast, calculating the mean only needs the sum and count of events, enabling rapid computation without iterating through all events.
Page 345

How should systems be designed to withstand high load and scale on demand when using windowed aggregations?


Systems should be designed to withstand high load and scale on demand by sharding the collections of events in windows, allowing effective scaling of these operators.
Page 345

What is the Scatter and Gather pattern and what is a common use case for it?


The Scatter and Gather pattern involves processing the same event in parallel with different operations and then combining the results to emit a single event. A common use case is a loan application process where operations like credit check, address verification, and identity verification are performed in parallel and then combined for a loan decision.
Page 349

Scatter and gather: In scatter and gather, we process the same event in parallel, performing different operations, and finally combine the results so all event outputs can be emitted as a single event. This is one of the most common scenarios for using the Stream Join Pattern.
What is the Stream Join pattern used for?


The Stream Join pattern is used to join various types of events based on a defined condition and a window.
Page 350

Why is the Join operation considered stateful and what should be done when event loss cannot be tolerated?


The Join operation is stateful because it needs to wait for all matching events to arrive before making a valid join. When event loss cannot be tolerated, reliability patterns should be used to ensure events are preserved across system failures and restarts.
Page 351

How can simple scatter and gather scenarios avoid event loss upon system failure or restart?


In simple scatter and gather scenarios, events can be read directly from message brokers, and acknowledgment can be deferred until events are successfully joined. This ensures events are not lost upon system failure or restart, as the message broker will republish those events.
Page 351

What challenges arise from joining many events during a long time period and how can these be addressed?


Joining many events during a long time period can increase space requirements and processing times. The Sequential Convoy pattern can be used to shard events based on joining attributes, parallelizing the joining process and ensuring related events fall into the same shard for successful joining.
Page 351

What is a common use of the Temporal Event Ordering Pattern?


A common use of the Temporal Event Ordering Pattern is to identify an incident by having a sequence of events occur in a prescribed order.
Page 354

How can the Temporal Event Ordering Pattern be used to detect the nonoccurrence of an expected event?


The Temporal Event Ordering Pattern can detect the nonoccurrence of an expected event to identify an erroneous situation. For example, notifying a homeowner if their garage door is left open for one minute after the car drives out.
Page 355

Detect nonoccurrence of event: Now, let’s say we want to identify an incident by an expected event not occurring. These are commonly used for detecting erroneous situations, such as notifying a homeowner that their garage door has been left open.
What considerations are needed for state machines in cloud native applications using the Temporal Event Ordering Pattern?


State machines are inherently stateful, requiring applications to rely on reliability patterns to persist their states across system failures and restarts. Each application should have enough in-memory space to maintain the state machines, and the Sequential Convoy pattern should be applied to distribute events to various nodes for scalable and parallelized sequence matching.
Page 356

How can event ordering be guaranteed in the Temporal Event Ordering Pattern?


Event ordering can be guaranteed using the Buffered Event Ordering pattern to overcome out-of-order events that occur during transmission. Correlating and ordering events based on event-generation time can help manage relative ordering.
Page 356

How are prebuilt machine learning models generated and used?


Prebuilt machine learning models are generated by data scientists using data processing tools and machine learning frameworks like Apache Spark, TensorFlow, or Python. These models can be imported into running applications via technologies such as Predictive Model Markup Language (PMML) and queried on the fly to generate predictions. They can also run as separate cloud native applications and be called via APIs.
Page 358

How do online machine learning models differ from prebuilt models?


Online machine learning models tune themselves based on the information they receive as they produce predictions and may require a feedback loop with the results from their previous predictions. These models can be embedded into applications or run as separate microservices.
Page 358

When is using a prebuilt machine learning model ideal?


Using a prebuilt machine learning model is ideal when there is abundant historical data, and the prediction pattern does not change with new events. For example, automating detection of defective parts in a controlled production line environment.
Page 359

Predict based on prebuilt machine learning models: Using a prebuilt machine learning model is ideal when we have abundant historical data, and when the prediction pattern does not change with new events. Let’s consider an example of automating detection of defective parts in a production line. Detecting defects early in a production line can reduce cost.
When should continuously learning online machine learning models be used?


Continuously learning online machine learning models should be used if it is expected that the models will need to learn new behaviors as they receive new data in the future.
Page 360

What are the advantages and disadvantages of prebuilt and online machine learning models?


Prebuilt machine learning models are simpler to implement and generally have high accuracy with enough data but cannot adapt to new trends. Online machine learning algorithms perform better in adapting to new data but can fluctuate in accuracy due to memory constraints and data limitations. Combining both types of models can provide higher accuracy by overriding prebuilt model predictions with more accurate online model predictions.
Page 361

Why is it important to update prebuilt machine learning models periodically?


It is important to update prebuilt machine learning models periodically because their accuracy can degrade over time due to changes in circumstances. In cloud native applications, these models can be embedded, making it easy to update the application version along with a newer model.
Page 361

How should online machine learning models handle state recovery in cloud native applications?


Online machine learning models store what they’ve learned in memory, so it is recommended to use reliability patterns on those cloud native applications to ensure they can recover state across system failures and restarts.
Page 361

Is it possible to perform aggregations without a window and what considerations must be taken into account?


Yes, it is possible to perform aggregations without a window, where the window spans from the service startup to the current time. These aggregations must operate with constant space complexity to avoid running out of memory.
Page 346

How does the Sequential Convoy Pattern process events?


The Sequential Convoy Pattern groups events into categories based on their characteristics and processes them in parallel. For example, an ecommerce application can categorize purchase events by customer type and order size to provide various delivery guarantees.
Page 364

How it works: As the name suggests, the Sequential Convoy Pattern sees events as items moving along a conveyor belt. It groups the events into categories based on their characteristics and processes them in parallel. One example is an ecommerce application that provides different product delivery time guarantees based on type of customer and order size. This application can categorize purchase events by the type of customer (such as premium, gold, or not a member) and can categorize gold member purchases by their order size (such as $50 or less as small, $50 to $500 as medium, and more than $500 as high). This allows events to be partitioned and processed in parallel to provide various types of delivery guarantees, such as the same day, in two days, and in one week.


How can event order be maintained in the Sequential Convoy Pattern?


Each event can be labeled with a sequence number before separation, allowing substreams to maintain event order during processing. This sequencing can also be used to join parallel streams later based on the original event order using a merge sort.
Page 365

Each event also can be labeled with a sequence number before separation. This allows each substream to maintain the event order during processing by applications that require guaranteed ordering, such as when using patterns like Windowed Aggregations or Temporal Event Ordering.
How does the Sequential Convoy Pattern help overcome cloud native stream-processing application limitations?


The Sequential Convoy Pattern helps overcome limitations such as CPU, memory, and bandwidth by partitioning events into multiple substreams and parallelizing their processing, eliminating latency addition and event buildup in queues.
Page 367

Scale stream-processing applications: The Sequential Convoy pattern helps overcome cloud native stream-processing application limitations such as CPU, memory, and bandwidth, and allows us to process events with high throughput and low latency. For example, say we’re processing an event stream that transfers large amounts of confidential data. The events are encrypted and compressed to transfer the data much faster. Performing CPU-intensive operations such as uncompressing, decrypting, and transforming the data in real time is time-consuming, and performing such complex operations not only slows events, but also adds latency for the following events in the stream. This can lead to backlog buildup in the queues and cause bottlenecks for the whole system.
How can the Sequential Convoy Pattern partition different event types into parallel streams?


The Sequential Convoy Pattern enables partitioning different event types into parallel streams to execute different use cases against the same event stream, such as categorizing customers and providing different discounts in an ecommerce platform.
Page 368

What should be considered when categorizing events for meaningful stream processing?


The ability to categorize events based on attributes like customer ID or product ID is crucial. Sometimes it may require combining various attributes to better process related events together and in parallel. Also, be mindful of noncontiguous sequence numbers when regrouping events.
Page 369

Considerations: The ability to categorize events based on event attributes is crucial for meaningful stream processing. Sometimes events can be categorized based on a single attribute such as customer ID or product ID, but in other cases we might need to combine various attributes such as order value or place of order to better process related events together and in parallel.
What is an effective way to regroup events and handle noncontiguous sequence numbers?


An effective way to regroup events is by using an end-of-sequence message emitted periodically with the last processed message ID. This helps identify dropped messages and unblock the processing of later events.
Page 369

What should be done when grouping based on sequence numbers is not possible?


When grouping based on sequence numbers is not possible, collect events and publish them to a single topic. Use the Buffered Event Ordering pattern to buffer and sort events based on sequence numbers or event timestamps.
Page 369

How can stream-processing microservices be scaled when they become a bottleneck?


To scale stream-processing microservices when they become a bottleneck, reshard by altering the relevant categories to divide the saturated substream into multiple substreams. Migrate the application state across all substreams to ensure efficient scaling.
Page 369

What should be evaluated when using the Sequential Convoy Pattern for high throughput and low latency?


Evaluate whether adding sequence numbers to events and rejoining them based on those numbers can create a bottleneck. Reevaluate if ordering is essential for use cases that require extremely high throughput and low latency.
Page 370

How can events generated by distributed sources be ordered?


Events generated by distributed sources can be ordered through timestamps. Use the Buffered Event Ordering pattern to fetch and reorder events based on timestamps, ensuring the sensors have synchronized times for accurate ordering.
Page 372

Order events generated on distributed event sources: Events generated by distributed sources usually become out of order because of data transmission latency added by network and intermediate systems. Consider an example of distributed surveillance sensors emitting events. These events can reach the processing system at various times because of transmission latency, so they will be out of order when we combine all of them into a single stream for processing.
How can events from the same event sources be reordered after parallel processing?


Events from the same event sources can be reordered after parallel processing by adding sequence numbers along with the timestamp. This allows the events to be grouped back together efficiently.
Page 373

Reorder events generated from the same event sources: Often we need to parallelize event processing to achieve performance, and then reorder events into their original sequence for further processing. For example, say we want to parallel-process user interaction on a browser and merge those events back in order.
When is the Buffered Event Ordering Pattern useful and what are its limitations?


The Buffered Event Ordering Pattern is useful for aggregating events over time or detecting a sequence of actions. However, it can add latency or introduce a bottleneck and should not be used when events are generated from distributed sources with unsynchronized timestamps.
Page 373

Considerations: the Buffered Event Ordering Pattern is useful when we need to aggregate events over time or when a sequence of actions needs to be detected. In all other cases, we do not recommend using the Buffered Event Ordering Pattern, as it can add latency or introduce a bottleneck to the system.
Why should sequence numbers be added to events generated from a single source?


Sequence numbers should be added to events generated from a single source because ordering events based on sequence numbers is more efficient and does not add as much latency as ordering by timestamp.
Page 373

What decision should be made when a late-arriving event is detected and what factors influence this decision?


The decision to send the out-of-order event forward for processing or drop it depends on the use case. For example, dropping an old event may be acceptable for reporting current status but not for tracking credit card transactions to monitor fraud.
Page 373

Why must the Buffered Event Ordering Pattern’s microservice employ reliability patterns?


The Buffered Event Ordering Pattern’s microservice must employ reliability patterns because it stores events in the buffer while waiting for older events, meaning it has state that needs to be recovered across failures and restarts.
Page 373

What is necessary for the Course Correction Pattern to work in downstream applications?


For the Course Correction Pattern to work, downstream applications should know that events can be partial updates and adapt based on more-accurate updates arriving later.
Page 375

How is the Course Correction Pattern used to update results with new information?


The Course Correction Pattern holds events for more time and alters decisions based on late arrivals, useful for displaying real-time results on a screen. For example, creating buckets for each minute to count orders and sending updates when late events arrive.
Page 376

Update results with new information: the Course Correction Pattern is commonly used when users are eager to obtain aggregated results quickly. This can especially be useful when we are displaying results in real time on a screen. In these cases, we simply need to hold the events for more time, and alter the decision based on late event arrivals.
How does the Course Correction Pattern handle early decisions and corrective actions?


The Course Correction Pattern allows early decisions to be made and sends compensation events for corrective actions when the situation changes. For example, broadcasting a message to all taxis for a ride request and sending a correction event to cancel other taxi assignments if multiple taxis accept the ride.
Page 377

Correct previous decisions: Sometimes we need to make an early decision, and when the situation changes, we send compensation events so that corrective actions can be taken. Let’s say we want to dispatch a taxi as soon as a user requests one.
When should the Course Correction Pattern be used and what are its limitations?


The Course Correction Pattern should be used when early estimates are useful, and the use case allows for compensation based on an update. It should not be used for use cases that do not support course correction, as it may require high memory usage to wait for late events.
Page 378

Considerations: the Course Correction Pattern can be used only when early estimates are useful and the use case allows for compensation or course correction based on an update. For use cases that do not support course correction, we have to delay the decision making by using patterns like Buffered Event Ordering.
How can watermarks be used to synchronize events generated from time-synchronized sources?


Watermarks can be used to generate synchronized event groups that produce accurate aggregation results by emitting watermark events at given intervals. This ensures accurate aggregation unaffected by network delays or other external factors.
Page 381

How can events from nonsynchronized sources be synchronized using watermarks?


Each sensor client can periodically fetch a sequence number from a global counter on a central server and inject it with the sensor reading. Events can be synchronized based on the sequence number of the watermark events, using the timestamps of those watermark events to determine the relative time of events.
Page 382

What are the considerations for using the Watermark Pattern?


The Watermark Pattern is useful for significant differences in event arrival times due to network latency or unsynchronized input systems. It should not be used unless necessary due to its complexity. It can reduce time synchronization issues but cannot guarantee accurate results for all use cases.
Page 383

The Watermark Pattern is useful only when we know that significant differences exist in event arrival times because of network latency or when the input systems do not have their times in sync. In other cases, the Watermark Pattern does not bring us many advantages. For example, when systems are already synchronized on time, we can use the Buffered Event Ordering pattern to sort events by their timestamps.
Why is reliability key for cloud native stream-processing applications?


Reliability is key because most cloud native stream-processing applications store their state in memory for low latency and high throughput. Reliability patterns ensure the state of applications is preserved across failures and restarts.
Page 386

How do reliability patterns support event delivery and processing in cloud native applications?


Reliability patterns help achieve at-least-once event delivery and allow exactly-once event processing, ensuring events are processed once and only once, even during system failures.
Page 386

How can events be replayed when the system state is not persisted?


Events can be replayed by retrieving events from a durable topic subscription and deferring acknowledgment until successful processing. This ensures events are deleted from the message broker only when successfully processed and output is produced.
Page 388

How can events be replayed when the system persists its state?


Events can be replayed by storing the aggregation state to a data store periodically. During failure, the last state can be retrieved from the data store, and all events from that point can be replayed.
Page 388

What are the considerations when using the Replay Pattern to recreate application state?


The Replay Pattern can produce duplicate events, requiring dependent systems to be idempotent. It should not be used if duplicate events can cause confusion. Buffer events for a longer period to prevent loss during extended failures, and use it with Periodic Snapshot State Persistence for critical applications.
Page 389

Considerations: Though the Replay pattern helps re-create the application state, it can produce duplicate events as a result of the replay. In this case, the dependent systems should be idempotent. We should not use the Replay Pattern when duplicate events can cause confusion on the part of dependent systems.
How should snapshots be persisted in a data store to avoid blocking event processing?


Use threads or similar methods to persist state, ensuring that the snapshot process does not block the processing of more events.
Page 396

How can the Sequential Convoy pattern help scale microservices in the Periodic Snapshot State Persistence Pattern?


To achieve scalability, replace each microservice in the Sequential Convoy pattern with primary and secondary pairs, ensuring that the second microservice is used only as a failover and not for processing more data.
Page 400

When should stream-processing logic be implemented within a microservice versus using a stream-processing system?


Simple patterns such as Transformation and Filters and Thresholds can be implemented within a microservice. More complex patterns should use a stream-processing technology for better outcomes.
Page 402

What key aspects need to be tested in stream-processing applications?


Key aspects include testing the ability to handle state, running chaos testing for reliability patterns, and using the Watermark pattern for deterministic state assertions. For time-bounded patterns, process events based on event timestamps to eliminate network and application latency.
Page 407

In this section, we cover the most important aspects of testing stream-processing cloud native applications. When testing stream-processing applications, we need to follow conventional approaches of writing unit and integration tests. Because stream-processing applications are asynchronous, we recommend you follow all testing suggestions provided for event-driven architecture in “Testing”, along with the suggestions provided here.
One of the key aspects that we need to test in stream-processing applications is their ability to handle state. When we use reliability patterns, we should run chaos testing to test whether the application is continuously producing correct results despite system failures. To assert the application state deterministically, we can use the Watermark pattern to publish watermark events at the end of each test for the application to persist its state and to ensure that it has completed event processing.
How can security be enforced for cloud native stream-processing applications?


Security can be enforced by connecting to message brokers and other systems via secured protocols, encrypting data in transit and at rest, and using a bounded context fronted by an API or secured message broker.
Page 408

How can we enforce security for cloud native stream-processing applications? As discussed in Chapter 5, stream-processing applications can enforce security by connecting to message brokers and other systems via secured protocols, and using data and encrypting data in transit and at rest.
Why is monitoring memory consumption critical for stateful stream-processing applications?


Monitoring memory consumption is critical because storing events in memory during event spikes can cause the system to run out of memory and fail. Monitoring and load-shedding mechanisms, along with partitioning or remodeling the stream-processing pipeline, can mitigate risks.
Page 409

How can the Periodic Snapshot State Persistence pattern help stateful stream-processing systems recover after failure?


This pattern allows stateful stream-processing systems to recover their state by monitoring that snapshots are properly saved and garbage-collected once outdated. Monitoring snapshot size, write time, network bandwidth, CPU, and memory is crucial for reliable state storage and restoration.
Page 409

Why is monitoring event timestamps important in stream-processing applications?


Monitoring event timestamps helps identify discrepancies caused by network latency and complex stream-processing logic, ensuring accurate synchronization using patterns like Buffered Event Ordering, Course Correction, or Watermark.
Page 409

What is the first step in applying DevOps to stateful stream-processing applications?


The first step is selecting the appropriate reliability pattern based on the system’s availability and scalability requirements to achieve reliable stream processing.
Page 410

How should persistence stores be selected for stream-processing applications in DevOps?


Select persistence stores that enable rapid state persistence and restoration, store state durably, and empirically determine the optimal snapshot size and frequency to minimize latency. Monitor and garbage-collect redundant snapshots, and ensure snapshots are encrypted for security.
Page 410

What is recommended for dealing with sensitive data in stream-processing applications?


Encrypt events, purge events and snapshots as soon as they are no longer needed, and use a bounded context to protect applications from external threats via an API or a message broker.
Page 410

Why is observability and monitoring important for asynchronous applications in DevOps?


Observability and monitoring using distributed tracing, logging, and monitoring systems are important due to the difficulty in troubleshooting failures in asynchronous applications. Continuous delivery and backward compatibility of event schema and snapshots ensure smooth deployments.
Page 410

