1. Introduction to AWS MSK and data pipelines
Data is the new oil, and just like oil, it needs to be refined and processed to be useful. In today’s data-driven world, businesses need to process vast amounts of data to make informed decisions. Data pipelines are the solution to this problem. A data pipeline is a series of interconnected steps that process and transform data from its raw form into a format that can be analyzed and utilized.
This blog aims to help anyone that plans or is in the process of integrating data pipelines using Kafka and Kafka Connect in AWS using MSK and MSK Connect managed services and gives a little bit of insight using configuration examples and errors, which I encountered during my proof of concept.
2. What is a data pipeline?
A data pipeline is a set of procedures that take raw data and convert it into an analysis-ready format. Data ingestion, cleaning, transformation, and storage are some of the steps that can be taken. Depending on how much data is involved and how much processing is necessary, a data pipeline might be simple or complex. A data pipeline’s primary objective is to prepare the data for analysis.
2.1 Components of the Data Pipeline
Numerous technologies, such as cloud-based platforms, database systems, and programming languages, are used to create data pipelines. A data pipeline’s fundamental components are:
- Data Source: Finding the data source is the initial step. This might be an object storage system, database, file, or API.
- The data is subsequently entered into the data pipeline during data ingestion. At this phase, the data is extracted from its source and loaded into the pipeline.
- Data cleaning is necessary once the data has been ingested. The process of cleaning data comprises getting rid of any duplicates, mistakes, or discrepancies.
- Data transformation: Upon cleansing, the data is changed into an analysis-ready format. In this step, the data is structured, new variables are created, or the data is aggregated.
- Data Storage: The converted data must be saved in a database or file as a final step before being analyzed.
2.2 Data Pipeline Types
Depending on the type of data, the processing requirements, and the sort of analysis required, there are various types of data pipelines, each of which is created to fulfill a particular set of needs. Businesses may select the best data pipeline for their needs in data processing by being aware of the various types available. I’ll mention two of the more popular ones.
- Batch processing: This type of data pipeline processes data in predetermined intervals or batches, typically once per day or once per week. For processing massive amounts of data that don’t require real-time processing, batch pipelines are effective. Applications like business intelligence, reporting, and data warehousing frequently make use of batch pipelines.
- Streaming Pipeline: Real-time or nearly real-time data processing is done through stream data pipelines. Stream pipelines are effective for applications like fraud detection, real-time analytics, and real-time monitoring that need to analyze data in real time. Technologies like Apache Kafka, Apache Flink, and Apache Spark Streaming are frequently used in stream pipelines.
Both types may perform either ETL or ELT.
- Extract, Transform, Load (ETL) Pipeline: ETL pipelines are used to extract data from various sources, transform it into a format that is suitable for analysis, and then load it into a target database or data warehouse. ETL pipelines are commonly used for data integration, data migration, and data warehousing.
- Extract, Load, Transform (ELT) Pipeline: ELT pipelines are similar to ETL pipelines, but the transformation of the data occurs after it is loaded into the target database or data warehouse. ELT pipelines are commonly used for big data processing, where the transformation of the data is done in the target database or data warehouse using distributed processing technologies such as Apache Hadoop, Apache Spark, and Apache Hive.
In this blog, we will concentrate on the Streaming Pipeline type. We will use the managed services AWS MSK and AWS MSK Connect.
3. AWS MSK and AWS MSK Connect
3.1 AWS MSK
Building real-time data pipelines and streaming applications uses the popular open-source streaming framework, Apache Kafka. Kafka is the best choice to be used in scenarios including real-time analytics, data integration, and event-driven applications since it has high throughput and low latency data processing capabilities.
Building and running Kafka-based cloud applications is simple with AWS MSK (Amazon Managed Streaming for Apache Kafka), a fully managed service. You can quickly and simply manage scalability, reliability, and performance while also configuring security settings for Kafka clusters using Amazon MSK.
How does AWS MSK Work?
AWS MSK integrates with other AWS services, such as Amazon S3, Amazon Kinesis, and Amazon EC2, to provide a complete streaming data platform.
Amazon Managed Streaming for Apache Kafka supports the latest version of Kafka and provides automatic upgrades, monitoring, and management of Kafka clusters. AWS MSK also integrates with AWS security services, such as AWS Identity and Access Management (IAM), AWS Key Management Service (KMS), and Amazon VPC, to ensure secure and compliant data processing.
3.2 AWS MSK Connect: Simplifying Kafka Data Pipelines
AWS MSK Connect is a new feature of AWS MSK that simplifies the process of building and managing data pipelines using Kafka. AWS MSK Connect allows you to easily move data between Kafka and other data storage systems, such as Amazon S3, Amazon Redshift, Amazon Elasticsearch, and Amazon Relational Database Service(AWS RDS).
AWS MSK Connect provides a simple and scalable way to build data pipelines by allowing you to define connectors that move data between Kafka and other data systems. AWS MSK Connect supports a wide range of connectors, including JDBC, S3, Elasticsearch, and HTTP, among others.
3.3. Benefits and downsides of using AWS MSK and AWS MSK Connect
AWS MSK and AWS MSK Connect are powerful tools that can streamline your data processing workflows and enable you to build and manage Kafka-based data pipelines in the cloud. However, like any technology, they come with both benefits and downsides.
Benefits of using AWS MSK and AWS MSK Connect:
- Scalability: AWS MSK is a fully managed service that provides automatic scaling of Kafka clusters based on workload, enabling you to handle large volumes of data and increase throughput as needed.
- High Availability: AWS MSK provides automatic failover and replication of data, ensuring that your data is always available and durable.
- Simplified Monitoring: Monitoring can be done by AWS Cloudwatch or you can scrape Prometheus metrics by enabling “Enable open monitoring with Prometheus”.
- Integration with Other AWS Services: AWS MSK integrates with other AWS services, such as Amazon S3, Amazon RDS, and others enabling you to build end-to-end streaming data pipelines that process, store, and analyze data in real time.
Downsides of using AWS MSK and AWS MSK Connect:
- Cost: When processing huge amounts of data or needing high throughput, AWS MSK and AWS MSK Connect might be pricey.
- Learning curve: Setting up and using AWS MSK and AWS MSK Connect can be challenging and call for a deep understanding of Kafka and other data processing technologies.
- Vendor lock-in: Depending on your current setup, transferring to a different provider may prove to be quite difficult because AWS MSK and AWS MSK Connect are proprietary services provided by Amazon Web Services (AWS).
- Limited Customization: Because AWS MSK and AWS MSK Connect are completely managed services, you have little power to alter how Kafka clusters and connections are set up and customized.
4. The Setup
Data Pipelines can have many different kinds of setups using various technologies, tools, and programming languages, depending on the use case. However, for the sake of simplicity, I will give you a hypothetical layout.
Infrastructure components:
- AWS VPC
- 3x Availability Zones (us-east-1a, us-east-1b, us-east-1c)
- 3x private subnets
- 3x NAT Gateways
- 2x RDS(Aurora or Single RDS instance)
- 1x MSK Cluster – 3x Brokers and 3x Zookeepers
- 2x MSK Connectors
- 2x MSK Connect Custom Plugins (one for each type of connector plugin)
- 1x S3 Bucket for the zipped plugins – Debezium and JDBC Sink Connector for Confluent
As shown from the diagram above, we have a VPC with 3 private subnets each in a different availability zone with its own NAT Gateway(This is needed by the MSK Connectors for internet connectivity). The MSK Connect Custom Plugins will be used by our connectors to import the zipped Kafka connect plugins – Debezium and JDBC Sink Connector for Confluent.
The source connector will stream any row changes that happened from the source database, upload them to a Kafka topic, and after the sink connector will consume and write them to the target database.
Below, I will list each component, details about it, and issues or errors that I have encountered, alongside their solutions.
4.1 PostgreSQL Source Database
This will be the database of our microservice, called APP1, from which we will stream any row changes that happened in the database to a Kafka topic as messages. This method is also called Change Data Capture(CDC). After that those messages will be written to the aggregated database by the Kafka sink connector.
In order for this to happen we will need to turn on the logical replication of the RDS instance.
This can be done by creating a “Paramater group” if you are using a Single RDS Instance or a “DB Cluster Paramater Group” if you are using an Aurora Cluster. In the parameter group, you need to set “rds.logical_replication=1” and apply it. After that reboot your database.
Access your database and execute “show wal_level;”.
Expected output:
app1_db=> show wal_level;
2 wal_level
3-----------
4 logical
5(1 row)
If it shows “replica” then the parameter group was not applied successfully.
4.1 PostgreSQL Target Database
No additional configurations have to be done on your target database. In our case, this will be the “Aggregated Database”.
5. Schema Registry
From the official confluent documentation – “Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings, and allows the evolution of schemas according to the configured compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.”
- Find the Source HERE.
- For our schema registry, we will use Confluent’s Schema Registry.
- Check Official documentation HERE.
There are a few options to choose from for data serialization, but we will focus only on one – Avro.
Avro is a data serialization format developed by Apache Software Foundation. It is a compact and efficient binary format designed to provide fast serialization and deserialization of data. Avro provides features such as schema evolution, dynamic typing, and support for complex data structures, making it a popular choice for data pipelines and distributed systems.
One of the key benefits of Avro is its support for schema evolution. As data structures change over time, Avro allows for the evolution of data schemas without requiring changes to the code or data files. This makes it easier to manage data pipelines and distributed systems that may be running different versions of the same data schema.
Avro is also highly interoperable, with support for a wide range of programming languages and platforms. This makes it easy to integrate Avro into existing systems and to share data between different parts of a distributed system.
In addition to the above, Avro also reduces the size of the messages in a Kafka topic, which can be a game changer when we talk about millions and millions of messages in terms of latency and storage efficiency.
Simple Diagram:
Alternative schema registries:
6. MSK Cluster Configuration
AWS MSK comes with default configurations, but you can also create custom configurations for both the Kafka brokers and the Kafka zookeepers. The custom configurations have versioning and can be applied on multiple MSK Clusters. Kafka configurations can be very complex and vary based on different factors such as average message size, number of messages per second, retention policies, replication, lag, and many more.
- Check out this link to official docs for the default configurations.
- Also, review this link to official docs for the custom configurations.
This is an example custom configuration, containing basic settings:
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
Notable details:
auto.create.topics.enable=true
– This property highly depends on the workflow and the way you manage your infrastructure. If you haven’t enabled this the source connector will not be able to auto-create the topic and you will have to have it created beforehand.
default.replication.factor=3
– as a rule of thumb the replication factor should be equal to the number of brokers you have, in order to achieve high availability. In this case, we have 3 brokers in 3 different availability zones, so the replication factor is set to 3.
min.insync.replicas=2
– should be less or equal to the default.replication.factor
7. Source Connector
We will use the Debezium connector for PostgreSQL. It will monitor 2 database tables from the source database and capture all row-level changes. In short, it will create a snapshot of all the schemas and their tables and continuously capture row-level changes that insert, update, and delete database content. This will generate an event record and stream it as a message to the Kafka topic.
7.1 Source Connector Custom Plugin
This resource is required, as it is used by MSK Connect to import and run the Kafka connect-specific plugin, in this case – Debezium. It uses S3 as “backend” storage to pull the zipped Kafka plugins.
Download Debezium from HERE. After that upload it to your S3 bucket.
7.2 Source Connector Worker Configuration
Worker configurations are a separate AWS resource in the MSK stack of managed services. A worker is a JVM process that is running the Kafka connector logic.
Each worker can have a set of tasks(tasks.max) running in parallel threads. The tasks are responsible for data copying.
Unfortunately, worker configurations are not very exhaustive, limiting the amount of customization we can do on the MSK Connector. One such limitation affects the configuration of our connectors in this example. I am speaking of the internal Kafka topics that the Kafka connect framework creates – status, config, and offset.
Only the offset storage topic is supported for configuring, the other 2 are auto-generated with unique IDs, and upon recreation of the MSK Connectors new ones will be created. This becomes an issue when many recreations occur, essentially filling up the available number of partitions per broker, which are based on the instance type of the MSK Cluster. No need to panic if partitions get maxed out. MSK Cluster will continue to operate, you just can’t make configuration changes.
Auto-generated status, config, and offset topic names look similar to these:
Offset-__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
Status - __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
Config - __amazon_msk_connect_config_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
Example worker configuration for our source connector:
1 key.converter=org.apache.kafka.connect.storage.StringConverter
2 value.converter=org.apache.kafka.connect.storage.StringConverter
3 offset.storage.topic=source-datapipeline-app1-prod-offset-storage
4 offset.storage.partitions=1
5 offset.storage.replication.factor=3
Line 3: Setting a name for the offset storage allows you to keep the last committed offset and continue reading if a recreation of the MSK Connector occurs.
Line 4: The default number is 25, which is a lot. Debezium can run only with “tasks.max=1”, essentially using only 1 partition. If you want to distribute the workload you need to create additional source connectors streaming changes from different tables.
Line 5: Replicate the offset storage topic on all broker nodes, in this case, we have only 3.
7.3 Source Connector configuration
Kafka Connect configurations can drastically vary depending on the use case and the function they serve.
This is an example configuration, where I will try to describe the idea behind some of the properties:
1 "connector.class= "io.debezium.connector.postgresql.PostgresConnector"
2 "tasks.max"= "1"
3 "table.include.list"= "public.table_A,public.table_B"
4 "database.hostname"= "app_1_db.abcd77efg.us-east-1.rds.amazonaws.com"
5 "database.port"= "5432"
6 "database.user"= "<db-user"
7 "database.password"= "<db-password>"
8 "database.dbname"= "app1_db"
9 "database.server.name"= "app1"
10 "plugin.name"= "pgoutput"
11 "time.precision.mode"= "connect"
12 "slot.name"= "source_datapipeline_app_1"
13 "publication.name"= "source_datapipeline_app_1"
14 "publication.autocreate.mode"= "filtered"
15 "transforms"= "Reroute,unwrap"
16 "transforms.Reroute.type"= "io.debezium.transforms.ByLogicalTableRouter"
17 "transforms.Reroute.topic.regex"= "(.*).public.(.*)"
18 "transforms.Reroute.topic.replacement"= "$1_$2"
19 "transforms.unwrap.type"= "io.debezium.transforms.ExtractNewRecordState"
20 "transforms.unwrap.drop.tombstones"= "false"
21 "value.converter"= "io.confluent.connect.avro.AvroConverter"
22 "value.converter.basic.auth.credentials.source"= "USER_INFO"
23 "value.converter.schema.registry.url"= "https://abcd-12ef3.us-east-2.aws.confluent.cloud"
24 "value.converter.basic.auth.user.info"= "<API_KEY>:<API_SECRET>"
25 "key.converter"= "io.confluent.connect.avro.AvroConverter"
26 "key.converter.basic.auth.credentials.source"= "USER_INFO"
27 "key.converter.schema.registry.url"= "https://abcd-12ef3.us-east-2.aws.confluent.cloud"
28 "key.converter.basic.auth.user.info"= "<API_KEY>:<API_SECRET>"
Line 1: defines the type of Kafka connect plugin we will use. In our case it’s Debezium.
Line 2: as mentioned earlier, Debezium can only work with a single task, as it is used to read the changes from a single pg_replication slot.
Line 3: list the database tables the connector will read from. Fully Qualified Table Name(FQTN) is required to be used here – in this case it’s the tables “table_A” and “table_B” from the “public” database schema.
Line 4: Endpoint of the RDS
Line 5: Database Port
Line 6: Unfortunately, I couldn’t find a way to create a replication and publication slot with a user other than the initial Master user. One way to do it is to create it beforehand or use AWS Secrets Manager plugin for AWS MSK and store the credentials there. More on this here – https://aws.amazon.com/about-aws/whats-new/2022/03/amazon-msk-external-secrets-configurartion-providers/
Line 7: Database User’s password
Line 8: Actual name of the PostgreSQL database you are trying to connect to
Line 9: logical name that provides a namespace for the particular PostgreSQL database instance and it is used for the formation of the Kafka topic name, if “auto.create.topics.enable” is true.
Line 10: this is the logical decoding plugin. It is a mechanism that allows the extraction of the changes that were committed to the transaction log and the processing of these changes in a user-friendly manner with the help of an output plug-in. The output plug-in enables clients to consume the changes.
In our case, we are using pgoutput. This is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed. The Debezium connector interprets the raw replication event stream directly into change events. The other ones are decoderbufs and wal2json, but we won’t discuss them now.
Line 11: this is needed if you have any “TIMESTAMP” type columns with timezone information and want to preserve the time format in the Kafka message as it is. More on this here – https://debezium.io/documentation/reference/1.2/connectors/sqlserver.html#sqlserver-temporal-values
Line 12: Name of the replication slot that will be created, if it’s not already present. How to list:
select * from pg_replication_slots;
Line 13: Name of the publication slot that will be created, if it’s not already present. How to list:
select * from pg_publication;
select * from pg_publication_tables;
Line 14: recommended by Debezium. The default value is “all_tables”, which will create a publication for all tables. Whereas, “filtered” will create only for the tables listed in the “table.include.list”.
Lines 15-20: Single Message Transformations(SMTs) which we will mainly use to transform the name of the auto-created topics in Kafka, using regex. We are essentially changing the name of the topic from “app1.public.table_A” to “app1_table_A”. This is needed by the Sink Connector because the names of the database tables it will create and write to will be based on the Kafka topic names. If this is not changed, the Sink Connector will show the following error:
org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: “app1.public.table_A”
Lines 21-28: define details about our Schema Registry, such as URL, credentials, and type of authentication. We will be using basic auth, so we are setting it to “USER_INFO”.
Encountered errors and their solutions:
“ERROR Postgres roles LOGIN and REPLICATION are not assigned to the user”:
Solution: This type of error can be encountered when the source connector is trying to create new replication and publication slots. One way to tackle this issue is by creating the slots beforehand manually or with some kind of automation(Ansible/Puppet/Chef). The second way is to leave the creation of the publication and replication slots to the connector and use either a dedicated user with “REPLICATION” permissions(follow this guide https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-permissions) or the initial master user of the RDS.
ERROR Postgres server wal_level property must be "logical" but is: replica (io.debezium.connector.postgresql.PostgresConnector:101)
Solution: Logical replication needs to be enabled. This can be done only by applying a parameter group(Single RDS Instance) or DB cluster parameter group(Aurora Cluster) and setting “rds.logical_replication=1”. After that RDS needs to be rebooted, in order for the change to take effect.
7.4 Sink Connector Worker Configuration
The worker configuration for the sink connector won’t be any different from the one for the source connector, except for the “offset.storage.partitions” property.
Example:
1 key.converter=org.apache.kafka.connect.storage.StringConverter
2 value.converter=org.apache.kafka.connect.storage.StringConverter
3 offset.storage.topic=sink-datapipeline-app1-prod-offset-storage
4 offset.storage.partitions=1
5 offset.storage.replication.factor=3
Line 3: Setting a name for the offset storage allows you to keep the last committed offset and continue reading if a recreation of the MSK Connector occurs.
Line 4: Unlike Debezium, the JDBC sink connector can run with more than “tasks.max=1”, because all tasks working in the connector are in a single consumer group, giving you the opportunity to achieve higher throughput by a single sink connector, instead of creating multiple connectors for the different Kafka topics. The consumer group’s ID is formed by the name of the MSK connector prepended by “connect-”. In our case, it will be “connect-sink-datapipeline-app1-prod”. If you plan to use more than one “tasks.max”, you need to increase the number of partitions for the offset storage topic, so that “tasks.max” <= “offset.storage.partitions”.
Line 5: Replicate the offset storage topic on all broker nodes, in this case, we have only 3.
7.5 Sink Connector configuration
For the sink connector, which will be used to consume the messages from the Kafka topic and write them in the target PostgreSQL RDS, we will use Confluent’s JDBC Sink Connector plugin. Just like the source connector configuration, I will try to give a more detailed view of each Kafka connect property and later list some of the encountered issues and their solutions.
Kafka connects plugin download HERE. After that, you need to upload the zip file to your S3 bucket and create an AWS Custom Plugin resource, which will be used by the Sink Connector.
Sink Connector configuration:
1 "connector.class"= "io.confluent.connect.jdbc.JdbcSinkConnector"
2 "tasks.max"= "1"
3 "topics"= "app1_table_A,app1_table_B"
4 "table.name.format"= "${topic}"
5 "connection.url"= "jdbc:postgresql://target-db.abc1defghijk.us-east-1.rds.amazonaws.com:5432/target_db"
6 "connection.user"= "<db-user>"
7 "connection.password"= "<db-user-password>"
8 "delete.enabled"= "true"
9 "auto.create"= "true"
10 "auto.evolve"= "true"
11 "insert.mode"= "upsert"
12 "pk.mode"= "record_key"
13 "pk.fields"= "uuid"
14 "value.converter.basic.auth.credentials.source"= "USER_INFO"
15 "value.converter.schema.registry.url"= "https://abcd-12ef3.us-east-2.aws.confluent.cloud"
16 "value.converter.basic.auth.user.info"= "<API_KEY>:<API_SECRET>"
17 "key.converter"= "io.confluent.connect.avro.AvroConverter"
18 "key.converter.basic.auth.credentials.source"= "USER_INFO"
19 "key.converter.schema.registry.url"= "https://abcd-12ef3.us-east-2.aws.confluent.cloud"
20 "key.converter.basic.auth.user.info"= "<API_KEY>:<API_SECRET>"
Line 1: define the connector class. In this case, it is JDBC Sink Connector
Line 2: as mentioned earlier sink connectors can work with more than one task, but they need to be equal to or less than the offset.storage.partitions settings in the worker configuration. If they are more the extra tasks will be idle as there are no available partitions to connect to.
Line 3: List the Kafka topics that the connector will consume from
Line 4: Default value is “${topic}”. This is a format string for the destination table name. This works alongside the “topics” property and will create a separate database table for each topic listed in it. If you are using terraform to deploy the connector you need to escape the value like this – “$${topic}”.
Line 5: In short, this is the endpoint of the database, and “target_db” is the target database that we want to write to.
Line 6: Database user
Line 7: Database user’s password
Line 8: (Depends on use-case) enable deletion of records.
Line 9: enables the auto-creation of the destination table if it’s missing. If you disable this property you need to have the destination tables created beforehand, either manually or by some automation such as Ansible, Chef, Puppet, Saltstack, and so on.
Line 10: from the official documentation – “the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and the removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. The addition of primary key constraints is also not attempted. In contrast, if “auto.evolve” is disabled no evolution is performed and the connector task fails with an error stating the missing columns.”
Line 11: from the official documentation – “Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.”
Lines 12 and 13: Please, refer to official documentation, as it is best described there – https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#delete-mode
Lines 14 to 20: define details about our Schema Registry, such as URL, credentials, and type of authentication. We will be using basic auth, so we are setting it to “USER_INFO”.
Encountered errors and their solutions:
1. Caused by: java.lang.ClassNotFoundException: com.google.common.base.Ticker
Solution: make sure you have a “guava” jar in your sink connector zip bundle.
2. Failed to send HTTP request to endpoint
Solution: This error can occur both on the sink or source connector and likely is, due to the fact the connector is deployed in a public subnet with an internet gateway. You need to redeploy it in a private subnet behind a NAT Gateway, otherwise, it won’t have an internet connection.
8. Monitoring
Even though AWS MSK can be monitored with tools such as AWS Cloudwatch and Prometheus, I want to give you a viable alternative – AKHQ.
Check the Official Website here, and Github.
AKHQ is open-source and provides a simple and intuitive interface for managing Kafka clusters and simplifies many of the tasks that are typically performed through the Kafka command-line interface (CLI). It allows you to monitor topics, view consumer groups, inspect messages, and perform various administrative tasks, such as creating new topics or deleting existing ones.
Some of the key benefits of using AKHQ for managing Kafka clusters include:
- User-friendly interface: AKHQ provides a user-friendly interface that simplifies the management of Kafka clusters, making it easier for developers, administrators, and other users to manage and monitor Kafka topics and consumer groups.
- Centralized management: AKHQ allows you to manage multiple Kafka clusters from a single web-based interface, making it easier to manage and monitor large-scale deployments.
- Real-time monitoring: AKHQ provides real-time monitoring of Kafka topics and consumer groups, allowing you to quickly identify and troubleshoot issues as they arise.
- Extensibility: AKHQ is highly extensible and can be customized to meet the specific needs of your organization.
Helm values example:
image:
repository: tchiotludo/akhq
configuration:
micronaut:
security:
enabled: true
token:
jwt:
signatures:
secret:
generator:
secret: "<secret>" #Changeme
akhq:
security:
enabled: true
default-group: no-roles
basic-auth:
- username: admin
password: <sha256sum> #Changeme
groups:
- admin
- username: readonly
password: <sha256sum> #Changeme
groups:
- reader
pagination:
page-size: 10
threads: 16
clients-defaults:
consumer:
properties:
default.api.timeout.ms: 60000
max.poll.records: 25
server:
access-log:
enabled: true
name: org.akhq.log.access
existingSecrets: ""
secrets:
akhq:
connections:
kafka-cluster:
properties:
bootstrap.servers: "b-1.kafkacluster.abcdef.a11.kafka.us-east-1.amazonaws.com:9092,b-2.kafkacluster.abcdef.a11.kafka.us-east-1.amazonaws.com:9092,b-3.kafkacluster.abcdef.a11.kafka.us-east-1.amazonaws.com:9092"
request.timeout.ms: 60000
schema-registry:
url: "https://abcd-12ef3.us-east-2.aws.confluent.cloud"
type: "confluent"
basic-auth-username: <API_KEY>
basic-auth-password: <API_SECRET>
service:
enabled: true
type: ClusterIP
port: 80
labels: {}
annotations:
ingress:
enabled: true
ingressClassName: ""
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
kubernetes.io/ingress.class: nginx
kubernetes.io/tls-acme: "true"
paths:
- /
hosts:
- my.domain.com #Changeme
tls:
- secretName: my-domain-com-tls #Changeme
hosts:
- my.domain.com #Changeme
readinessProbe:
enabled: true
path: /health
port: management
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
httpGetExtra: {}
livenessProbe:
enabled: true
path: /health
port: management
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
httpGetExtra: {}
resources:
limits:
cpu: 1000m
memory: 1024Mi
requests:
cpu: 500m
memory: 512Mi
networkPolicy:
enabled: true
You may discover more options delivered by GitHub here.
Conclusion
You may create and manage Kafka-based data pipelines in the cloud with the aid of AWS MSK and AWS MSK Connect, which are both powerful solutions. Scalability, high availability, streamlined management, connectivity with other Amazon services, and user-friendly connectors are just a few perks they offer. The downsides involve costs, a learning curve, vendor lock-in, and a lack of customization. When selecting whether Amazon MSK and AWS MSK Connect are the best options for your organization’s data processing needs, it’s critical to carefully evaluate the benefits and drawbacks of integrating them.
If you need assistance or consulting on how to implement this specific solution, get in touch with our team of seasoned DevOps experts.
I highly recommend checking out Robin Moffatt‘s blogs as they have provided me with invaluable insights and knowledge on Kafka and Kafka Connect.