Run Amazon Kinesis Client Library(1.x) on LocalStack

Daham Positha Pathiraja
Sysco LABS Sri Lanka
9 min readApr 26, 2022

--

Nowadays, AWS Kinesis Data Streams are very famous for collecting and processing large streams of data records in real-time. Data can be processed from these data streams using data processing applications called Kinesis Data Streams applications. These applications read data from streams in the form of data records. To make data-consuming efficient and scalable, AWS has introduced a support library called Kinesis Client Library that can create data processing applications capable of running in multiple instances in a distributed fashion.

LocalStack provides an easy-to-use test/mocking framework for developing Cloud applications. It spins up a testing environment on your local machine that provides the same functionality and APIs as the real AWS cloud environment.

Therefore by using LocalStack we can create efficient dev/test and deployment pipelines for deploying AWS Kinesis data stream-based applications we have.

The following diagram illustrates such kind of CI/CD pipeline that we can use.

Figure 1: CI/CD pipeline with LocalStck
  • The application can be fully developed in the local machine since LocalStack provisions all required AWS cloud resourced in the local container.

Note: For the simplicity I have depicted an application which depends only on AWS Kinesis data streams. Hence it only require AWS Kinesis Streams, AWS DynamoDB and AWS CloudWatch services to be mocked within the LocalStack container.

  • When changes get frequently pushed into the continuous integration test environment (Jenkins, Concourse), unit tests and integration tests will run on top of mimicked AWS services provided by LocalStack.
  • Once the build is properly verified it can be seamlessly deployed to the real AWS cloud environment.

Problem Definition

If you are a person working on AWS service-based applications and trying to fully depend on integration tests that can be run offline, you might have already sensed the taste of LocalStack. Also, you might have already written many integration tests on your codes leveraging LocalStack.

However, when it comes to AWS Kinesis data streams, it’s a bit different since that is not being processed via AWS SDK interfaces or direct API calls to AWS. Instead, 99% of the time AWS Kinesis streams are processed via another layer that is implemented with the help of AWS Kinesis Client Library. This is essential to overcome the number of distributed computing related challenges while consuming Kinesis streams via multiple compute instances.

Due to above mentioned fact leveraging LocalStack for a situation where you consume Kinesis data streams through AWS Kinesis Client Library is something which has not been discussed sufficiently.

Through this article I will explain how I overcame the fact which I mentioned in the problem definition while introducing minimal changes to KCL source code as well.

I am hoping to provide you an extensive overview on different aspects and terminologies pertaining to consuming AWS Kinesis streams via both non-KCL oriented and KCL oriented ways.

Finally I will explain how a LocalStack container can get easily established locally via Testcontainers LocalStack Module and exercise AWS Kinesis Client Library solely on LocalStack provided services.

Consuming Kinesis Streams without Kinesis Client Library

It’s always better to study something from its roots. Then the understanding will be much easier and stronger.

Suppose we do not have Kinesis Client Library and we have to consume records from a given Kinesis stream. In this case, the AmazonKinesis interface provided by AWS SDK can be used for this purpose.

For a moment let’s forget about various challenging scenarios that may arise from distributed computing perspective for consuming Kinesis stream through multiple consumer programs.

Key Terminologies of a Kinesis Data Stream

Before describing how to consume let’s first see how things are organized within a Kinesis data stream and key terminologies that are essential in this business.

Figure 2: Kinesis stream and shard attributes
  • Kinesis data stream can be depicted as a collection of shards (figure 2) which is its main throughput unit.

One shard provides a capacity of 1MB/sec data input and 2MB/sec data output. One shard can support up to 1000 PUT records per second. You will specify the number of shards needed when you create a data stream. For example, you can create a data stream with two shards. This data stream has a throughput of 2MB/sec data input and 4MB/sec data output and allows up to 2000 PUT records per second.

  • Records are published into each shard depending on a partition key which has to be specified by the publisher. In addition to the partition key record has a sequence number that is assigned by the Kinesis data stream itself. The rest of the record is the data blob that is the data of interest added to the stream by the producer (figure 2).

Kinesis Stream Consumption through AmazonKinesis Interface

As per my previous explanation about how the Kinesis data stream is organized it should be easier for you to think of a process of consuming a stream.

  • The initial step is quite obvious. Yes, you need to collect all shards in the stream first. AWS has provided an API called DescribeStream to accomplish this requirement. We need to specify a parameter called ExclusiveStartShardId where DescribeStream will give us the next N number of shards (this limit should be given in the DescribeStream request) after it (figure 2).
Figure 3: Describe Stream
  • While iterating above collected shards, now we need to consume records within each shard. Based on where to start consuming records, different shard iterators have been defined.
Figure 3: Different shard iterators

AT_SEQUENCE_NUMBER: read exactly from the position denoted by a specific sequence number.

AFTER_SEQUENCE_NUMBER: read right after a specific sequence number.

AT_TIMESTAMP: read records from an arbitrary point in time.

TRIM_HORIZON: cause the Shard Iterator to point to the last untrimmed record in the shard in the system (the oldest data record in the shard).

LATEST: always read the most recent data in the shard.

To get an overall idea I would like to present a code snippet that puts the above facts into practice. For the ease of grasping the idea, you can use the workflow diagram followed by the code snippet.

Code Snippet 1: Kinesis Consumer
Figure 4: Collect shards from the Kinesis data stream and consumer shard records through Shard Iterator

Consuming Kinesis Streams through Kinesis Client Library

As I mentioned previously in couple of places, the main purpose of using Kinesis Client Library is to take care of complex tasks associated with distributed computing. In other words the scenario is really challenging when multiple consumers are intended to consume a Kinesis stream with multiple shards. Some of those challenges are

  • How shards are being shared among each consumer.
  • How to keep track of the processed records to avoid start from very beginning in a consumer failure.
  • How to adapt expected changes like shard count changes and consumer count changes.

Let’s now see how AWS Kinesis Client Library has successfully provided resolutions to these problems.

Figure 5: Kinesis Client Library in action

First, the entity called KCL consumer application should be created. However, a KCL consumer application is typically distributed and therefore physically multiple Consumer Application Instances are available. Through these application instances consumer application has received the capability of balancing the load of data processing and properly coordinate failure situations.

Worker is the actual implementation which is used by the consumer application to start processing records. Worker has been entrusted to do following tasks on behalf of the consumer application.

  • Syncing shard and lease information.
  • Tracking shard assignments.
  • Processing data from the shards.
  • Pass configuration information of the consumer application to the Kinesis Client Library such as the name of the data stream whose data records this KCL consumer application is going to process and the AWS credentials that are needed to access this data stream.
  • Starts consumer application instances to deliver processed records to the associated record processors.

Lease is the entity which used to bind worker to a given shard. Also this is the way which KCL consumer application partitions the data processing among multiple workers. A Lease can be identified by the unique attribute LeaseKey. One Worker is capable of holding multiple Leases at the same time but the same Lease cannot be shared among KCL consumer applications.

To keep track of shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application, a unique Amazon DynamoDB table called Lease Table is used. This table has a few columns which are useful for each worker.

  • LeaseKey: A unique identifier for a lease.
  • LeaseOwner: To whom that a particular lease is assigned to.
  • Checkpoint: The most recent checkpoint sequence number for the shard. This value is unique across all shards in the data stream.
  • LeaseCounter: Used for lease versioning so that workers can detect that their lease has been taken by another worker.

Last but not least there are Record Processors which are being instantiated by a Worker to cater each Lease owned by it. Record Processor is the entity which comprises of the logic that the KCL consumer application processes the data that it gets from the data streams.

As it is depicted in the figure 5 KCL Consumer application has to predominantly deal with three AWS service.

  • AWS Kinesis Data Streams: the stream which is being consumed.
  • Amazon DynamoDB: to keep Lease Table.
  • AWS CloudWatch: to offload events and logs about each functionality within the consumer application.

Minimal steps required to create a KCL Consumer Application are as the following.

  • Create a record processor extending the interface IRecordProcessor
Code Snippet 2: Record Processor
  • Create a factory class which creates the above processor instance by implementing the IRecordProcessorFactory interface.
Code Snippet 3: Record Processor Factory
  • Create a Worker by embedding the required configurations and the above created IRecordProccessor factory.
Code Snippet 4: Creation of a Worker

Enabling Mock AWS Services through a LocalStack Container

A LocalStack container comprises of number of mock services where you can call after getting the properly exposed to the host OS. In this context we only need to expose AWS Kinesis, AWS DynamoDB and AWS CloudWatch services to the outside (figure 6).

Figure 6: Exposing required services from the LocalStack container

Manually Setup and Run LocalStack Container through a Docker Compose File

We can get this done through docker-compose.yml file like the following.

Code Snippet 5: Docker compose file for LocalStack

However, when we write integration tests it would be really clean and tidy if we can get this configured within the code itself.

Testcontainers provides a nice module for LocalStack to get this done as we expect (I am not going to explain deeply about Testcontainers but I strongly advice you to go through that). Following configuration file can be used to get the above done.

Code Snippet 6: LocalStack configuration

Create KCL Worker by Adding Configurations to Connect LocalStack

Now lets complete the Worker creation code snippet to connect with LocalStack mock services for AWS Kinesis, AWS DynamoDB and AWS CloudWatch.

Code Snippet 7: Worker configuration to point LocalStack mock AWS services

Wait! Couple of things are yet to be Completed.

As you can clearly see in code snippet 7, KinesisClientLibConfiguration class in the Kinesis Client Library has provided the capability of giving endpoint urls of the required AWS services.

However, Kinesis Client Library-1.x has not provided the capability of giving AWS CloudWatch service endpoint URL as a configuration parameter.

In the following issue I have raised the above problem to the Kinesis Client Library project.

In the following PR to Kinesis Client Library 1.x I made the required changes to resolve the above limitaion.

References

--

--