Dotnet developers’ guide to Kafka
This article will demonstrate creation of Kafka producer and consumer with C# using Confluent Kafka library.
Assumption is that the reader has knowledge of basic terminologies of Kafka. If not don’t worry I have got you covered.
The first step of implementing Kafka in a project, which most times is skipped, is revisiting the question “Whether Kafka is the best fit for your project?”
Use cases Kafka is best fit for:
- Multiple clients require a stream of the same data object or a subset of the data object. Instead of having multiple queues containing same data Kafka can reduce the overhead of maintaining all the different queues.
- When a “source of truth” is required between two applications where one is streaming data to the other. The datatype that Kafka uses is “commit log”, which basically is a persistent queue. In case of any conflict after publishing and receiving of data, Kafka can be used as a source of truth to check what went wrong and where.
- If a faster throughput for streaming data is required and immutable datastore isn’t a bottleneck. Kafka would be very useful here since it uses the speed of RAM to store data and the commit logs it uses is immutable.
It is advised to do a thorough research of Kafka and it’s use cases before implementing it.
Confluent Kafka is the most popular library for working with Kafka in C#. It is a nuget package that can be installed via nuget.org
Dotnet Kafka Client Best practices :
- Keep connection alive for the entirety of applications lifecycle. Kafka uses TCP protocol, and since the TCP handshake is costly it is advisable to keep a single persistent connection.
- Explore the configuration needs of the application and set the timeouts and retries accordingly. Confluent Kafka gives a lot of control to the developer using this library to setup
- Using acknowledgment appropriately based of how important the data is.
→ Use DeliveryResult to check PersistedStatus.
→ Use “Ack.None” for when broker sends no acknowledgment to the producer.’
→ Use “Ack.Leader” for when data is persisted in the Leader but will not wait for acknowledgement from the followers.
→ Use “Ack.All” for when acknowledgment should be sent when the message is committed by all the ISR(In Sync Replicas)
Tech stack used in the sample project:
- Runtime: dotnet core 3.1
- C# 8.0
- Nuget Package: Confluent Kafka 1.4.2
- Kafka Management UI: Lenses.io Kafka Docker :- Follow the steps to setup the docker container. Use it as a local Kafka server and UI to monitor local consumer and producer.
The snippets below for producer and consumer are written with the above best practices taken into consideration.
Producer:
No need to worry about a reconnection logic, since an automatic reconnection occurs if connection to the broker is lost.
Utilize Startup.cs for web APIs and Program.cs for console apps to initialize Producer/Consumer connections. Assign values to required configuration fields. This code has the recommended values.
- RetryBackoffMs is the time the producer waits before retrying the message delivery after a failure response.
- MessageSendMaxRetries is the maximum amount of times a producer can retry a message with failed delivery.
- MessageTimeoutMs is the maximum time a produced message can take for a successful delivery including the retry attempts.
For example, if RetryBackoffMs = 200, MessageSendMaxRetries = 5, MessageTimeoutMs = 500. A failed message will be retried 5 times with a waiting period of 200ms between each try, which makes the total for a worst case senario 800ms. Since MessageTimeoutMs is set to 500ms, Producer will not wait until all the retry attempts are done. It will throw a timeout exception after 500ms. - EnableIdempotence should be used when the sequencing of the messages being sent to Kafka is important. Since producing message is async, setting this to true will guarantee that messages are successfully produced only once and in original order.
- EnableDeliveryReports is by default set to true. If you wish for the producer to be a fire and forget task set this to false. This will boost performance since producer is not expecting a return value and broker will not be sending one.
- DeliveryReportFields should be set to only the values you require, as by default the delivery report will contain all the values. If delivery reports are required, use this field. This too will help boost performance. Since lesser data will be sent over the network, delivery report will be received quicker
Confluent Kafka does not throw an exception or a specific error message when producer fails to connect to the broker. It gives message time out exception if connection doesn’t establish. But there is a way around it, callback can be used to ensure errors gets logged.
- librdkafka logs error message in console if connection to broker is not achieved.
- If application uses a different method for logging. Callback can be used, as shown in the code which uses SetLogHandler, a built-in in Confluent Kafka. This ensures you can perform certain rituals after connection failure occurs.
- Callbacks don’t happen in a synchronous fashion so cannot be relied on them to do something like throwing an exception as soon as connection fails. But this can be extremely helpful to log any errors.
If the keys aren’t required, Ignore can be used while initializing the producer. Example code:
new ProducerBuilder<Ignore, string>(config)
The value of IProducer is injected from Startup.cs. The class whose dependency has to be injected(IProducer) and dependent class(KafkaController) should be added as service in the startup class.
Await can be skipped if producing messages needs to be done in fire and forget manner.
Consumer
- This sample consumer application is a worker service.
- GroupId is used to group the consumers together. Should be used carefully since, the offsets will be stored against this id, if changed offset will be reset. Kafka guarantees that a message is only ever read by a single consumer in the group.
- EnableAutoCommit, if set true, commits the offset as soon as consumer consumes the data, i.e. marks the offset consumed. If the offset shouldn’t be committed as soon as it is consumed but let’s say after some action is performed success fully there is an option of calling a “consumer.Commit()” after setting this config false.
- AutoOffsetReset if the broker doesn’t have a last offset value from the given consumer, then this helps to set the offset to ‘earliest’, ‘latest’ or ‘error’ using enums.
- FetchWaitMaxMs is the max time consumer would wait after message consumption is triggered before filling the response with minimum bytes.
- EnablePartitionEof helps in knowing whether any data is left to read or not. Use “consumedData.IsPartitionEOF” to check partition eof after consuming the message.
- Use SetLogHandler exactly like producer.
- Use ‘Subscribe()’ method to subscribe to the topic from where the messages are to be consumed.
- Access the message values like ‘key’, ‘timestamp’, ‘value’ etc. using ‘consumed.Message.Key’ and likewise. Since, ‘consumed.Key’ etc. values have been made obsolete.
Sample data with lenses:
It takes about 45 seconds once docker is running for lenses to start. So, if you get a bad request error don’t panic.
Here a new ‘test_topic’ has been created by the producer. It can be seen once you click on Kafka Topics on the navigation bar on the left.
Once the consumer is up and running, and is successfully connected to the Kafka broker that is running on the local machine, it will appear here.