Smart patterns of decoupling with Kafka Connect — Part 1

We’ve just published an article about how an e-commerce system can be reshaped by using the Kafka ecosystem.

The “metamorphosis” of that e-commerce system was started by decoupling the main/central web application through which customers can interact with the system from all the other external components to which it was coupled synchronously. This decoupling was powered by a multitude of patterns implemented by using Kafka Connect components.

We will continue by showing one by one how all the patterns described above can be actually implemented.

Scenario

The basics of this scenario were covered in the “Metamorphosis of an e-commerce system by Kafka
and I am talking specifically about the ProductCatalog Database.

In the system’s current state, when a user wants to visualize a product page, then she/he must go directly to the ProductCatalog Service and ask for that product. The service will take the incoming request and go to the ProductCatalog Database and ask for the database to perform a query where multiple tables are joined together to assemble the product. This state is described in the following diagram.

Product Catalog Service

This solution seems to be pretty fine, but imagine what happens in a scenario where more than four tables are joined together
or where there are hundreds of thousands of products. I will not continue further with talking about the disadvantages of this
solution since they were discussed in the last article.

The solution that we want to adopt over the current one will be to pull the result of this join and push it to a Kafka topic
as soon as possible after new products are inserted into the database or after old products are updated. The result of the
join can be wrapped into a JSON/XML document and more, we can serialize that result accordingly to an Avro schema which works
pretty fine within the Kafka ecosystem.

Solution

Capture the changes

We said that we want to publish to Kafka the result of the above-described aggregation as soon as possible after new products are
inserted into the database or after old products are updated. This sounds pretty close to what Change-Data-Capture (CDC) is about. So in the end we need to capture somehow the changes that are made in all those tables that are joined.

In order to capture the changes we need to create some additional database resources:

  • a table to store when the result of a join (we will call it ProductAggregate) will be invalidated due to executing an INSERT or UPDATE SQL statement.
    This table will be called AGGREGATE_INVALIDATIONS and can be created by running the following SQL command.

    Create statement for AGGREGATE_INVALIDATIONS table

  • two triggers per each table that will be used to insert invalidations into the above-created table whenever a change occurs. The triggers can be created by using statements like the following one.

    Create triggers for PRODUCT table

Basically in this way, we can capture all the changes that take place in the database as events that are stored in the AGGREGATE_INVALIDATIONS table.
A row inserted into this table will contain:

  • a name which is basically the event that has happened;
  • the id of the updated product or of the newly added product;
  • the occurred_at as a timestamp at which the event took place;
  • the source of the change which is basically the table where the change occurred.

Join tables

Now that we have added a new table in our schema we need to add it to the existing query in order to be joined along with the other ones.
The result of the query will return only the invalidated aggregates and will provide various data elements that will let us serialize it
as a Kafka event:

  • Key — consisting of the product’s id and will be used as partitioning key within Kafka;
  • EventId — a newly generated UUID;
  • AggregateId and AggregateType — metadata-like data elements;
  • EventType — populated by using the name column of the AGGREGATE_INVALIDATIONS table;
  • PayloadType — the type of the payload which in our case will be “application/xml”;
  • Payload — the product aggregate as an XML document;
  • LastModifiedDate — populated by using the occurred_at column of the AGGREGATE_INVALIDATIONS table.

Since in this example, the database used is Oracle 18c, we will use the package that allows us to work with XML on Oracle.
Most of the databases have similar packages that allow us to work with XML or JSON documents.

The following diagram will show what tables will be involved in the new query.

PRODUCT_CATALOG Database

Have a look at how the query is represented as SQL syntax.

SQL Query

Make aggregates available via PRODUCT_AGGREGATE_VW

As we have already stated, we will use a Kafka Connect JDBC Source Connector to pull the data from the ProductCatalog Database and publish
it to a Kafka topic. The connector polls for new data and performs the above query at a specified time interval. Since the above query looks
not very friendly, we will “hide” it under a SQL view. Let’s call this view PRODUCT_AGGREGATE_VW and create it by using the following SQL statement.

PRODUCT_AGGREGATE_VW

Configure the Kafka Connect JDBC Source Connector

First of all, we must create a Dockerfile for our Kafka Connect JDBC Source Connector and then build a container from it.
The Dockerfile used in this example can be found here.

The connector needs a lot of configurations that must be provided when starting it.
Those configurations can be found here as an ENV file.

After the connector application was started inside the container, it must be instructed what work must perform. A container can have one
or more tasks that can perform independent work. In our case, there will be a single task that will poll the database, query the above-created
view, and publish the resultset of the query as a Kafka event to a Kafka topic.

In order to create a task, we need to use the Kafka Connect REST Interface
which is exposed by our connector application that runs inside the Docker container.

Usually, the port on which the Kafka Connect REST Interface can be accessed is 8083 and it was preserved in our example as well.
We will need to submit an HTTP POST request at the /connectors endpoint with the following request body and a new connector task will be created.

Kafka Connector Task configuration

Since this article is not about explaining Kafka Connect in detail, for a better understanding of how it works I recommend following the
Confluent documentation and also
one of the great articles from Robin Moffat who’s a Kafka Connect expert.

Initial publishing of aggregates to Kafka aka Initial load

We have almost all our pieces in place, but still, we need to do some kind of initial publishing or initial loading
of all of the already existing products. In order to achieve this, we can simply use an already existing LAST_MODIFIED_AT-like column
or create one for our tables and update it by using the current timestamp that the database is giving to us.

In this way, the after-update triggers will invalidate all the ProductAggregates.

The connector will query the view and will retrieve the complete resultset which will be published to Kafka.

Our job is now done!

Conclusions

As we can see, there is a lot of work that must be done to accomplish such a task.
There are a lot of advantages of doing this, but also there are disadvantages and to enumerate only a few:

  • development that must be done at the database level
  • increasing the coupling degree to a specific database technology
  • most of the APIs which are exposed on top of some database are performing some business logic. With this solution, that business logic is ripped apart from the picture and must be replicated where the data will be used
  • operational complexity will increase since we will add more components to our system

Of course, the above-enumerated issues are only a few of them, but this is not a bulletproof solution and it works for specific use cases.
Sometimes this approach is easier to be implemented than dealing with code changes directly into a legacy component or finding the bottlenecks
inside it and fixing them or porting the entire application from an old infrastructure (eg: Java application servers such as Oracle WebLogic,
IBM WebSphere, RedHat JBoss) to a cloud-native one (eg: Kubernetes).

Notes

If you want to reproduce the example you can check out the code from here.

For running an Oracle database as a Docker container on your local machine, you can find more info here.

If you don’t have an already existing Kafka cluster, then you can create a free trial account on Confluent and create a cluster there.

Further reading

Other news: