Smart patterns of decoupling with Kafka Connect — Part 2

In “Metamorphosis of an e-commerce system by Kafka” we have introduced a specific business context, specifically: redesigning a legacy e-commerce system into a modern distributed system.

Subsequently, we have tackled the task and start solving it in a series of articles, the first of them being the one that described how we have decoupled the ProductCatalog component from the e-Commerce Web Application.

We will continue in the current article to show two different implementations of the Outbox pattern by using Kafka Connect.

Scenario

After decoupling the ProductCatalog component from the main web application, we will tackle the decoupling of the Rating Service.

The business flow is pretty simple, we have an external component that exposes two endpoints:

  • one endpoint that allows users to add 1–5 star ratings for a specific product;
  • one endpoint that returns, as a result, the average star rating for a specific product.

In the large context of the e-commerce web application, when a user requests for displaying the page of a product, then an HTTP request will be performed in order to retrieve the average star rating for that product.

The following diagram sketches how the Rating Service looks like:

Rating Service

This sounds pretty reasonable, but imagine what happens when the Rating Service is unavailable — a timeout will occur and we won’t display any rating for our products. In the context of an e-commerce system, ratings are a pretty critical part of the business flow and we don’t want to not have them available when users are trying to buy products.

The solution is to bring all the ratings for all the products into our own service/component and simply read them from the SaleProduct database (If you don’t remember/don’t know about SaleProduct concept please check out the initial article —“Metamorphosis of an e-commerce system by Kafka”).

Solution

In order to bring the ratings from Rating Service into our e-commerce Web Application, we can leverage the Outbox pattern in the following way: whenever a new rating is added for a product, a new event will be published into a table called OUTBOX. The event will contain the new average rating for the specified product. We will configure a Kafka Connect JDBC Source Connector which will pull the data from the OUTBOX table and publish it to Kafka.

Publishing the event into the OUTBOX table can be done in two different ways:

  • by using a trigger which will assemble the event and publish it in the table after a new rating was added for a product — we will call this approach the Trigger-based approach;
  • by programmatically creating an event from the Rating API and insert it into the OUTBOX table as part of the transaction used to add the new rating — this one will be named Transactional-based approach.

Since we have two possible approaches, I will discuss both of them by showing both advantages and disadvantages of each.

Trigger-based approach

This approach is very useful when we can’t change the source code of the Rating Service due to various reasons: legacy code that must be changed, the source code is not available as the service is owned by another team/party but we still have access to the database, etc. In these scenarios we can leverage such an approach and get the job done.

Still, this approach has some major disadvantages:

  • since we, as clients of this service, are interested only in finding the average rating for a specific product, there is business logic performed inside the Rating API — i.e. computing the average rating for the product. Since in this specific case, the business logic is not very complex we can replicate it at the database level, but still, this is not a very orthodox approach. Very often the business logic is more complex and we can’t reproduce it at all;
  • the platform coupling degree will raise since we will become more coupled to the specific database technology — i.e. if the Rating Database will be changed from one technology to another, then we must migrate that piece of code also.

Transactional-based approach

Probably you have already asked yourself why we are inserting a record (which is basically an event) into a table and let the event be pulled by a Kafka Connect component and published to Kafka and instead not simply directly publishing that event to Kafka? There must be some advantages of publishing the event directly to Kafka, right? In this way we will get rid of the Kafka Connect component and we won’t need to add another table in our schema.

Still, the above-described approach has some flaws, the major one being the fact that we will need a distributed transaction between the database and Kafka, which is by far a hard job.

So to conclude, we need an additional table for inserting into the database both the business data and the event to be published as part of the same transaction.

The advantages of this approach are pretty obvious:

  • the Rating Service team will be very aware of our need and will implement this new feature on their own;
  • the temporal coupling between the Rating Service and our e-commerce Web Application will be reduced;
  • by implementing this as part of the Rating API we will avoid duplicating the existing business logic;
  • to-be-published events can be serialized according to a schema specification (eg: CloudEvents) implemented by using a schema serialization mechanism/technology (eg: Avro, Thrift, Protobuf, etc.).

The disadvantages are not critical, but still we must take them into consideration:

  • since we will change the integration between the Rating Service and the main e-commerce application from using synchronous HTTP calls to asynchronous exchanging of events by using Kafka, the operational complexity will be increased.

Since the first approach is pretty similar to what we have seen in the previous article, I choose to show how to tackle this task by using the second approach.

The following diagram describes how the Rating Service should look like after we will implement the above-described solution:

New Rating Service

The current state of Rating API

First of all, please check out the current state of the Rating API and have a look at how it handles the business requirements at this moment. The source code is available here.

Introducing the OUTBOX Table

Let’s start redesigning the Rating Service. In the beginning, we will create the OUTBOX table described by the following SQL CREATE statement.

DDL for the OUTBOX table

The above table is one of the main pieces of our solution: it will store all the events that we want to publish to the external world. A row inserted into this table will contain:

  • a seq value which is just the next value generated by a sequence — can be viewed as a simple counter;
  • the type of the event to be published (eg: RATING_CHANGED);
  • the id of the aggregate (i.e. aggregate_id) contained by the event — it should be the id of the product for which the rating was changed. Subsequently, this info should be used as a partitioning key;-
  • the type of the aggregate (i.e. aggregate_type) contained by the event;
  • the occurred_at as a timestamp at which the event took place;
  • the payload of the event;
  • the payload_content_type (eg: Avro, JSON, XML);
  • the payload_schema_id and payload_schema are representing metadata about the schema used for serializing the event (eg: the id/version of the schema and the name of the schema used in serialization);
  • metadata is the column that will contain metadata about the published event;
  • created_at is a timestamp column that represents the timestamp when a new event was published in the OUTBOX table.

Code changes

We saw how to create the OUTBOX table, now let’s have a look at what code changes we must do in order to publish events into the above-created table.

In the first instance of the Rating Service, there was a repository implementation which in the end add a new rating record to the RATING table.

For redesigning this service according to the described solution, we need to create:

  • an entity-like class that will be used to insert records into the OUTBOX table;
  • a class that will be used as the payload of the published event. This one will contain the product identifier for which the rating was added and the new average rating value of the product;
  • a new implementation of the repository interface that will allow us to add a new rating record along with the to-be-published event as part of the same transaction.

Configure the Kafka Connect JDBC Source Connector

Since we have already used a Kafka Connect JDBC Source Connector in the previous article of this series, we will reuse the same Dockerfile and build a new container from it.

To build and start the new container we will need a configuration file — the following ENV file.

Like in the previous article, after the connector application was started inside the Docker container, we need to instruct it what task must perform. The configuration for our outbox event publisher is the one below.

Kafka Connector Task Configuration

If you have trouble in configuring the Kafka Connect JDBC Source Connector, check the previous article.

A very important fact that must be mentioned here is that this time we used a different database technology — PostgreSQL database. This is very important and can make a difference because with this kind of database we can leverage a different JDBC Source Connector provided by Debezium. There are a bunch of other databases that allows us to use Debezium for streaming changes.

Still, we used the classic Kafka Connect JDBC Source Connector, but Debezium must be used if possible. I said “if possible” because Debezium leverages the Write-Ahead Log (WAL) of the database and this requires special privileges (eg: superuser privileges for PostgreSQL) that probably cannot be obtained in the enterprise world where DBAs are ruling everything.

Conclusions

We saw how we can leverage outbox pattern for streaming changes outside of a service to the external world.

In the current article were describe two approaches of implementing this pattern:

  • trigger-based approach
  • transactional-based approach

but only one of them was implemented — i.e. transactional-based one. Keep in mind that both of them can be applied according to a specific context and also there are different consequences of choosing one or another.

Don’t forget about trying Debezium Connector instead of classic Confluent Kafka Connector.

Notes

If you want to reproduce the example, then the source code for Rating Service is here and the connector configurations can be found here.

Further reading

Here is a long list with resources that can be read about Outbox and Change Data Capture, provided by Debezium team.

Other news: