Reactive Kafka + Slow Consumers = Diagnosis Nightmare

Recently I’ve been working with the combination of Reactive Streams (in the form of Akka Streams) and Kafka, as it’s a good fit for some of the systems we’re building at work.

I hope it’ll be beneficial to others to share a particular nuance I discovered whilst working with this combintion, in particular a problem with slow downstream consumers.

To give a brief overview, we were taking messages from a Kafka topic and then sending them as the body of http post requests. This was working fine for the majority of the time, as we only get a message every couple of seconds normally.

However, the issues came when we had to deal with an influx of messages, from an upstream source that every now and then batches about 300 messages and pushes them onto Kafka. What made it even more difficult is that we didn’t know this was a contributing factor at the time…

So what happened? We saw a lot of rebalancing exceptions happen in the consumer, and also because we were using non-committing Kafka consumers, all the messages from offset 0 were constantly being re-read every second or so as a result of the constant rebalancing. Also when you try and use the kafka-consumer-groups script that comes with Kafka, you don’t get a list of partitions and consumers, but a notification that the consumer group either doesn’t exist or is rebalancing.

It turns out, Kafka was constantly redistributing the partitions across the 2 nodes within my affected consumer group. I can’t recall how I eventually figured this out, but the root cause was combining kafka in a reactive stream with a slow downstream consumer (http).

At the time of writing we’re using akka-stream-kafka 0.11-M3, and it has an “interesting” behaviour when working with slow downstream consumers – it stops its scheduled polling when there is no downstream demand, which in turn stops its heartbeating back to Kafka. Because of this, whenever the stream was applying backpressure (because we were waiting on http responses), the backpressure propogated all the way back to the Kafka consumer, which in turn stopped heartbeating.

To replicate this, I created the following Kafka topic:
./kafka-topics.sh --create --zookeeper 192.168.99.102:2181 --topic test_topic --replication-factor 3 --partitions 6

Then I used this code to publish messages onto Kafka, and ran two of these consumers to consume in parallel within the same Kafka consumer group.

What this causes the Kafka broker to do (at least with its default configuration) is to consider that node as slow or unavailable, which triggers a rebalancing of partitions to other nodes (which it deems might be available to pick up the slack). That’s why when I kept reviewing the state of kafka-consumer-groups, you’d eventually see all partitions being consumed by one node, then the other, then getting the rebalancing message. And because both of our nodes were using non-committing consumers, they both kept receiving the full backlog of messages, meanining they both became overwhelmed with messages and applied backpressure, which meant Kafka kept reassigning partitions… it was a vicious cycle!

Using the kafka-consumer-groups script you can see this happening:

benfoster$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.99.102:9000 --describe --group test-consumer
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
test-consumer, test_topic, 3, unknown, 3, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 4, unknown, 2, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 5, unknown, 3, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 0, unknown, 3, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 1, unknown, 2, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 2, unknown, 3, unknown, consumer-1_/192.168.99.1
benfoster$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.99.102:9000 --describe --group test-consumer
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
test-consumer, test_topic, 3, unknown, 3, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 4, unknown, 2, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 5, unknown, 3, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 0, unknown, 3, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 1, unknown, 2, unknown, consumer-1_/192.168.99.1
test-consumer, test_topic, 2, unknown, 3, unknown, consumer-1_/192.168.99.1
benfoster$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.99.102:9000 --describe --group test-consumer
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
test-consumer, test_topic, 0, unknown, 75, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 1, unknown, 74, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 2, unknown, 75, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 3, unknown, 75, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 4, unknown, 75, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 5, unknown, 75, unknown, consumer2_/192.168.99.1
benfoster$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.99.102:9000 --describe --group test-consumer
Consumer group `test-consumer` does not exist or is rebalancing.
benfoster$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.99.102:9000 --describe --group test-consumer
Consumer group `test-consumer` does not exist or is rebalancing.

And within my consumer’s app logs, you can see it constantly rereading the same messages:

2016-07-01 09:37:37,171 [PM-akka.actor.default-dispatcher-22] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 0, offset = 0, key = null, value = test2)
2016-07-01 09:42:07,344 [PM-akka.actor.default-dispatcher-14] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 0, offset = 0, key = null, value = test2)
2016-07-01 09:38:57,217 [PM-akka.actor.default-dispatcher-22] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 1, offset = 3, key = null, value = test24)
2016-07-01 09:43:37,390 [PM-akka.actor.default-dispatcher-20] DEBUG a.kafka.internal.PlainConsumerStage PlainConsumerStage(akka://PM) - Push element ConsumerRecord(topic = test_topic, partition = 1, offset = 3, key = null, value = test24)
etc...

So how did we fix this? Thankfully for us we knew that the number of elements ever to appear in a batch would be small (few hundred elements) so we added an in-memory buffer to the stream, which meant we could buffer all these for the http endpoint to eventually process and Kafka would be unaffected. This was a quick fix and got us what we needed.

As soon as you add a buffer, the two consumers behave, and you get this:

benfoster$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.99.102:9000 --describe --group test-consumer
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
test-consumer, test_topic, 3, unknown, 87, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 4, unknown, 86, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 5, unknown, 86, unknown, consumer2_/192.168.99.1
test-consumer, test_topic, 0, unknown, 87, unknown, consumer1_/192.168.99.1
test-consumer, test_topic, 1, unknown, 86, unknown, consumer1_/192.168.99.1
test-consumer, test_topic, 2, unknown, 86, unknown, consumer1_/192.168.99.1

Is it the right fix? Probably not, if we were dealing with greater volume or velocity we’d have to treat this app as a slow consumer, and possibly ditch the reactive streams abstraction in favour of utilising the lower level Kafka API to ensure we had full control over our partition and heartbeat management. But that constitutes a dedicated article(s) in its own right.

I hope someone else finds this useful, one of the mishaps you can have when you abstract so far away you don’t realise the issues that could occur beneath the woodwork.

I’ve uploaded the source code for the reactive producer (shamelessly ripped from an Activator template, and the ticker publisher code from my friend Neil Dunlop) and consumer I used if you’d like to replicate the scenario, You’ll need a Kafka broker running:
https://github.com/foyst/reactive-kafka-publisher
https://github.com/foyst/reactive-kafka-slow-consumer