Designing Event Driven Systems for microservices using kafka

Previously in my blog I have talked about microservices, and in a microservice architecture service teams think of their software as being a cog in a far larger machine. So suddenly for service team your customers or business people are not the only consumer of your application but are other applications that is consuming their application and they really cared that your service was reliable and application started to become platforms and as we discussed previously microservices does not share a single database with multiple application, but most of the time for service to perform it needs to access the data in a decoupled way as the key concept to implement microservices is, strong cohesion and low coupling. One of the pattern that was suggestion was event driven design pattern. Thank you confluent for offering me this free book on event driven system, called “Designing Event-Driven Systems: Concepts and Patterns for Streaming
Services with Apache Kafka” by Ben Stopford.

The beauty of events is that it decouples which means no one orders other do things, which reduces the dependency in services. It also makes a new entity to get used to with the system without the need of modification in other service. Even there are few techniques that encourages to decouple application from the database where the database is growing huge. We can do event based approach when a service to service data sharing is needed.

To make event drived programming easier Kafka has many useful features. Kafka has Connect interface, which pulls and pushes data to wide range of interfaces and datastores. Streaming APIs can manipulate data on the fly, that makes kafka encourages request-response protocols like Enterprise Service Bus. But kafka is much higher level tool compared to ESB. ESBs focus on the integration of legacy and off-the-shelf systems, and kafka is using an ephemeral and comparably low-throughput messaging layer. Kafka cluster has a distributed system at core, that provides high availability, storage, and linear scale-out. Using Kafka, we can store and lets users define queries and execute them over the data held in the log and we can pipe into views that users can query directly. It also supports transactions, just like a database but it is said that Kafka is a database inside out, a tool for storing data, processing it in real time, and creating views. Like most streaming systems implement the same broad pattern where a set of streams is prepared, and then work is performed one event at a time. Join, Filter, Process, in Kafka Streams or KSQL it can be implemented as well.

Where does kafka sits from the architectural level? One single Kafka cluster can be placed at the center of an organization, as the architecture of kafka inherits and inspired from storage systems like HDFS, HBase, or Cassandra compared to the traditional messaging systems that implement JMS (Java Message Service) or AMQP (Advanced Message Queuing Protocol) and that makes it more scalable. The underlying abstraction of kafka is, partitioned log sequentially being appended in a distributed computer system across multiple computers with redundancy which is designed to handles case like high-throughput streaming, mission-critical scenarios, ordering needs to be preserved. Its ability to store datasets removes the queue-depth problems that plagued traditional messaging systems. Kafka messaging system sits a partitioned, replayable log. this replayable log–based approach has two primary benefits. First it makes it easy to react to events that are happening now, with a toolset specifically designed for manipulating them. Second, it provides a central repository that can push whole datasets to wherever they may be needed. When a service wants to read messages from Kafka, it “seeks” to the position of the last message it read, then scans sequentially, reading messages in order while periodically recording its new position in the log. Taking a log-structured approach has an interesting side effect. Both reads and writes are sequential operations. This makes them sympathetic to the underlying media, leveraging prefetch, the various layers of caching, and naturally batching operations together. This in turn makes them efficient. In fact, when you read messages from Kafka, the server doesn’t even import them into the JVM (Java virtual machine). Data is copied directly from the disk buffer to the network buffer (zero copy)—an opportunity afforded by the simplicity of both the contract and the underlying data structure.

To ensure high availability kafka keeps redundant copies of same logs across multiple machines (usually/ideally 3) as replica that makes kafka failure tolerant. When a machine goes down we are not losing any data, and when that machine status changes to up, it sync up with other nodes and start contributing to cluster, that makes kafka not only reliable but also scalable. With Kafka, hitting a scalability wall is virtually impossible in the context of business systems. Even though kafka keep multiple copies of same data across machines but if we need to ensure paranoid level security to ensure the persistence of our highly sensitive data, we may require that data be flushed to disk synchronously with an overhead of throughput. If we absolutely have to rely on this approach it is recommended to increase the producer batch size to increase the effectiveness of each disk flush on the machine. When keys are not provided the replications are usually spread data across the available partitions in a round-robin fashion but when a key is provided with the message, it uses a hash of the key to determine the partition number. Kafka ensures that messages with the same key are always sent to the same partition which ensures strong orders. Sometime key-based ordering isn’t enough to maintain global ordering, use a single partition topic. When we are reading and writing to a topic we are basically reading and writing to all of them. By default messages are retained for some configurable amount of time for a topic. Kafka also has special topics called compacted topics, it stores data with respect to a consistent foreign key. It work like a simple log-structure merge-trees (LSM trees). It scans through a topic for old messages that has been  superseded (based on their key) and removes them in a periodic fashion. So if you find two logs with same key, don’t open a bug report right away. Although it’s not uncommon for kafka to see retention based or compacted topics holding more than 100 TB of data yet not no confuse it with a database. Multitanancy is very common when we are dealing with multiple services but it opens up the potential for inadvertent denial-of-service attacks which causes degradation or instability of services. To solve this problem Kafka introduces a throughput control quotas, that enables us to define an amount of bandwidth that would be allocated to specific services which ensures that it operates in the boundary of  enforced service-level agreements or SLA. Client authentication is provided through either Kerberos or Transport Layer Security (TLS) client certificates, ensuring that the Kafka cluster knows who is making each request.

Now lets go back to event driven programming. As we are used to batch operations and sequential programming, if a database is connected with our software, we are used to with the architecture where we ask question and we wait for an answer but in our world where data is being populated like rabbit and maybe our database size is huge. Maybe we should not waste our time waiting for answer, maybe we should ask then then keep doing something else that needed to be done and when our database is done producing the result that we have asked for, it should tell us by issuing and event that the task is done. Now a days the application that are using event-driven architectures, Event Sourcing, and CQRS (Command Query Responsibility Segregation) is helping them as a break away from the pains of scaling database centric systems. Maybe our database size is not huge, maybe database centric approach is working wonderfully. But it works for individual applications but we live in a world of interconnected systems. We need a mechanism for sharing data from one application to other that complements this complex, interconnected world. We can constantly push data from one application into our applications using events. To be event driven an applications needs to react, complex application may need to blend multiple streams together to make something meaningful out of it, it may need to build views and changing state and move itself forward.

One of such pattern for service to service communication could be, The Event Collaboration Pattern. The Event Collaboration Pattern allows a set of services to collaborate around a single business workflow, with each service doing its bit by listening to events, then creating new ones. For example basket service publishes an event to OrderRequested topic when an order is placed. Order service is subscribed to OrderRequested topic and when it listens to that event it validates the order and when it is done validating it creates an event to  OrderValidated. Payment service is interested about order validations so it is already subscribed OrderValidated events and when it a new event is published at OrderValidated it try to process payment and publish it to OrderPaymentProcessed topic. Order service is interested about OrderPaymentProcessed events so it when a new event gets published it confirms the order by publishing events at OrderConfirmed topic. Shipping service is interested about this topic and it has lots of status, and every status change can be a new topic that would interest the Order service and finally when Order is delivered Shipping service publish its event to OrderDelivered event. Order service is subscribed to this topic and it changes status of the event to delivered. As we can see, every service is decoupled and every service is collaborating with the services by publishing events (rather than making synchronous REST calls). If a service is not scaling properly it is just going to add more unprocessed messages to its events but it won’t crush the system or make the whole system down. That is the beauty of this pattern. One thing to note here is that the more events that we create the system is getting slower, to make the system faster, we need to appreciate parallel execution otherwise we won’t be able to make our system fast enough.

Let’s talk about Shared database approach. As kafka logs can makes data available centrally as a shared source of truth but with the simplest possible contract. This keeps applications loosely coupled. Query functionality is not shared; it is private to each service, allowing teams to move quickly by retaining control of the datasets they use. For example suppose in our previous example OrderDelivered event both order and email service are interested about this event and email service is producing email based on the event but the order service is lagging behind to update the order, in that scenario the user might get an email with a link that has not been created yet or the views are not updated yet and that would make the system inconsistent which is a violation of CAP theorem. In situation like this Kafka queries can come handy. For a certain event we can query for associated other event and see if that has been fulfilled or not then a service can do what it was supposed to do, but as we have mentioned previously in this blog, maybe paranoid style of data storage is necessary in this approach.

Database has a number of core components which includes a commit log, a query engine, indexes, and caching. Instead of conflating these components inside a single black-box which is known as database, separating them using stream processing tools. These parts can exist in different places, joined together by the log. Kafka Streams can create indexes or views, and these views behave like a continuously updated cache, living inside or close to your application. So in this approach rather than letting our application ask for data, we are basically pushing data to the application and let it process the task it needs to do.

Speaking of commands and queries. Commands are actions that can expect something to happen. Events are both a fact and a notification that has no expectation of any future action. Queries are a request to look something up, that does not expect any action but it a result of some data. From the perspective of a service, events lead to less coupling than commands and queries. Command Sourcing is essentially a variant of Event Sourcing but applied to events that come into a service, rather than via the events it creates. If we are using a database with our service and it is being updated from recalculation and processing the events there are few other benefits associated with it. If they are stored, immutably, in the order they were created in, the resulting event log provides a comprehensive audit of exactly what the system did. Another amazing thing about event driven programming is that, if at certain point we lost few of our services due to our bug, those services does not lose all its data that needs to be processed by the system because the events are stored in kafka and using kafka we can store data persistently as long as it has been predefined. If we implement CQRS using kafka, imagine how powerful it is going to get, as kafka keeps all its logs in its persistent storage, heavens forbid for some reason if our database gets into some sort of trouble we are not missing any data whatsoever. After the point of rollback of a backup, we have to populate the rest of the data from kafka log. So that is pretty neat.

In-Process Views with Tables and State Stores is an Event Sourcing and CQRS technique which can be implemented using Kafka’s Streams API as it lets us implement a view natively, right inside the Kafka Streams API without needing external database! From the example above when the “OrderValidated” event returns to the orders service, the database is updated with the final state of the order, before the call returns to the user. One of the most reliable and efficient way to achieve this can be using a technique called change data capture (CDC). Most databases write every modification operation to a write-ahead log so that at if the database encounters an error, it can recover its state from there. Many also provide some mechanism for capturing modification operations that were committed. Connectors that implement CDC repurpose these, translating database operations into events that are exposed in a messaging system like Kafka. Because CDC makes use of a native “eventing” interface, which is very efficient, as the connector is monitoring a file or being triggered directly when changes occur, rather than issuing queries through the database’s main API, and also it is very accurate, as issuing queries through the database’s main API will often create an opportunity for operations to be missed if several arrive, for the same row, within a polling period. ome popular databases with CDC support in Kafka Connect are MySQL, Postgres, MongoDB, and Cassandra. There are also proprietary CDC connectors for Oracle, IBM, SQL Server, and more.

Speaking of collisions and merging, collisions occur if two services update the same entity at the same time. If we
design the system to execute serially, this won’t happen, but if we allow concurrent execution it can. There is a formal technique for merging data in this way that has guaranteed integrity; it is called a conflict-free replicated data type, or CRDT. A useful way to generalize these ideas is to isolate consistency concerns into owning services using the single writer principle. When locks are used widely  in concurrent environments, and the subsequent efficiencies that we can often gain by consolidating writes to a single thread. The idea closely relates to the Actor model. From a services perspective, it also marries with the idea that services should have a single responsibility. responsibility for propagating events of a specific type is assigned to a single service—a single writer. This single writer approach can be implemented in kafka by modifying enforcing permission of topic at kafka. In single writer principle we are basically accepting eventual consistency over a global consistency.

Kafka data transactions do remove duplicates, allow groups of messages, state stores backed by a Kafka topic. When a http calls fail (maybe due to timeout), it is very common to give it a retry but maybe our retry is generating multiple database entry. It goes tricky when it happens on a system where the payment is involved. It makes system idempotent. Transactions in Kafka allow the creation of long chains of services, where the processing of each step in the chain is wrapped in exactly-once guarantees. Kafka can do that because they are not being operating using TCP, they are operating over UDP (User Datagram Protocol) which gives them a higher level of abstraction, which handles delivery, ordering, and so on. As Kafka is a broker, there are actually two opportunities for duplication. Sending a message to Kafka might fail before an acknowledgment is sent back to the client, with a subsequent retry potentially resulting in a duplicate message. On the other side, the process reading from Kafka might fail before offsets are committed, meaning that the same message might be read a second time when the process restarts. In kafka transactions it use commit as a marker message introduces in Snapshot Marker Model. After the messages are send it sends commit signal and until commit signals are sent it is not being available for read. Commit markers are coordinator. The overhead of this commit markers can be reduced by using it wisely. For context, For a batches that commit every 100 ms, with a 1 KB message size, have a 3% overhead when compared to in-order, at-least-once delivery.

Kafka consumer and producer needs to agree on the data format and schema.  For schema management: Protobuf and JSON Schema are both popular, but most projects in the Kafka space use Avro. For central schema management and verification, Confluent has an open source Schema Registry that provides a central repository for Avro schemas. New schema updates need to be backward compatible. Sometime it is absolutely necessary to change schema, which will break backward compatibility. For cases like this creating a new topic is recommended and also we would need to push data to both topic.

There are actually two types of table in Kafka Streams: KTables and Global KTables. With just one instance of a service running, these behave equivalently. However, if we scaled our service out—so it had four instances running in paral lel—we’d see slightly different behaviors. This is because Global KTables are broadcast: each service instance gets a complete copy of the entire table. Regular KTables are partitioned: the dataset is spread over all service instances. Whether a table is broadcast or partitioned affects the way it can perform joins. With a Global KTable, because the whole table exists on every node, we can join to any attribute we wish, much like a foreign key join in a database. This is not true in a KTable. Because it is partitioned, it can be joined only by its primary key, just like you have to use the primary key when you join two streams. So to join a KTable or stream by an attribute that is not its primary key, we must perform a repartition.

Ensuring consistency in data at the time of scaling can also be challenges as data propagation across nodes takes time, to solve this we can partition topic based on product id this way, product with same product id will always go to same node, which will make this operation consistent always. Inventory product and order products are maybe spread across multiple nodes and to be able to join them we need to reshuffle everything, this process is called Rekey to Join. data arranged in this way is termed co-partitioned. Once rekeyed, the join condition can be performed without any additional network access required. But then again in our next operation we may need to join few other tables, so we need to use rekeyed to join them, so our previous rekeys get shuffled in this way, so it can become a less practical solution depending on our situation.