We love working in data-intensive environments. There, I said it...

As one may assume, that requires general knowledge about distributed systems and some of the essential technologies like Apache Kafka, Apache Cassandra, Apache Spark, Elasticsearch, etc. All of these technologies are the core components of every system that we have designed so far.

We love all of them, but one we can’t live without is certainly Apache Kafka. In this article, I will share with you a short story on how we use Kafka, some problems we faced while designing Kafka based microservices, and how we managed to solve them.

So, let’s get started.

How we use Kafka

We use Kafka for ingestion pipelines and inter-service communication. To make working with Kafka more pleasant and smooth, we developed an in-house library that allows us to define data pipelines and interaction patterns quickly and consistently across all microservices in our system. Another benefit of this library is the easy introduction of newcomers into the project.

The platform we are building is event-driven, so microservices are mainly Kafka consumers and/or producers. However, some of them that communicate directly with client applications use WebSockets and expose a few HTTP endpoints. Moreover, we use Avro as a data format for Kafka messages, and Schema Registry for schema management.

Problem description

Apache Kafka is probably one of the best open-source projects ever, and that is why we’ve never had any major issues with it. We faced one insignificant issue while implementing our wrapper library around the default Kafka client - Kafka consumer API with KafkaAvroDeserializer from Kafka Avro Serializer library. Overall, nothing major.

However, when the consumer encounters a record that can't be deserialized to Avro, org.apache.kafka.common.errors.SerializationException will be thrown. This behavior is perfectly valid, but the consumer fails and is not able to recover from this error.

That is why fetching records from Kafka is done in batches. If a record can't be deserialized,  it won't be added to the ConsumerRecords collection. This means that committing offsets will not work, as we are not able to process records. The consumer will keep failing because of the deserialization error. This problem is well known, and you can learn more about it by looking at this issue or by reading this article. In the next few sections, I’ll go into more detail about a solution that works well for us.

How do bad records end up in Kafka?

We use Avro exclusively as a format of Kafka records, and with a properly secured Kafka Cluster, there is no way to produce garbage into any Kafka topic (a message that is not in Avro format for example). In production, we’ve never had a problem that  I described above, but in the testing and staging environment we did.

A situation that can be problematic is when we produce some random record on purpose without schema to a topic, using tools like kafka-console-producer or Conduktor, to see how our services will handle them.

Another scenario where a consumer can fail during deserialization is when Kafka topic receives records with different schemas, and this is a valid approach. However, if your service works only with the subset of schemas from that topic, and it doesn’t have all schemas from that topic, your consumer will fail, when encountering an Avro message for which it doesn't have generated Java class that corresponds to the Avro schema.

In the diagram above, we have a Kafka topic that works with circle and triangle schemas and has services A and B. Since service A works with both schemas from our topic, it can't fail. Service B only needs triangle records from the Kafka topic, and it will fail when encountering a circle record if it doesn't have that schema.

Adapting KafkaAvroDeserializer

Fortunately, KafkaAvroDeserializer can be extended, and some of the methods can be overridden. We are interested in this method:

public class AvroDeserializer extends KafkaAvroDeserializer {

    @Override
    public Object deserialize(final String s, final byte[] bytes) {
        return super.deserialize(s, bytes);
    }
}

This method can fail with the exception which is described in the previous section. The simplest solution is to try and deserialize message payload represented as a byte array, and if it's successful, the object will be converted automatically into a Java class that corresponds to a specific Avro schema. If an exception is thrown, it should be caught, and just return null.

public class AvroDeserializer extends KafkaAvroDeserializer {

    @Override
    public Object deserialize(final String s, final byte[] bytes) {
        try {
            return super.deserialize(s, bytes);
        } catch (final SerializationException exception) {
            return null;
        }
    }
}

How to use custom deserializer?

Replacing KafkaAvroDeserializer with our custom AvroDeserializer is as easy as you probably expect. Here is an example configuration on how to create Avro Consumer:

public class AvroKafkaConsumer<K, V> {

    protected KafkaConsumer<K, V> consumer;

    public AvroKafkaConsumer(final Properties props) {
        consumer = makeConsumer(props);
    }

    private KafkaConsumer<K, V> makeConsumer(final Properties props) {
        props.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            AvroDeserializer.class.getName()
        );
        return new KafkaConsumer<>(props);
    }
}

We just made a simple wrapper around KafkaConsumer that requires configuration to be created. Configuration of producers and consumers should be externalized, but that's not relevant for this example. Now, we should deal with the records that are not deserialized properly.

public ConsumerRecords<K, V> poll() {
    return filterCorruptedRecords(consumer.poll(Duration.ofMillis(10)));
}

private ConsumerRecords<K, V> filterCorruptedRecords(
    final ConsumerRecords<K, V> records
) {
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> filteredRecords = new HashMap<>();
    records.forEach(record -> {
        if (record.value() != null) {
            final TopicPartition topicPartition = new TopicPartition(
                record.topic(),
                record.partition()
            );
            final List<ConsumerRecord<K, V>> result = filteredRecords.getOrDefault(
                topicPartition,
                new ArrayList<>()
            );
            result.add(record);
            filteredRecords.put(topicPartition, result);
        }
    });
    return new ConsumerRecords<>(filteredRecords);
}

A simple method is implemented in order to filter out corrupted records. If you wish, you can log some metadata on corrupted records since you are able to extract information on topic, partition, offset and timestamp from ConsumerRecord, but the value will be null.

Can we do better?

The previous solution feels like a hack, but there is no fundamentally different approach to solve this particular problem. What we can do is make errors more detailed, improve logging, and introduce a dead letter topic, which will be a special Kafka topic, for messages that can't be deserialized. Let's introduce our custom error type:

public class AvroSerdeError {

    public enum ErrorType {
        UNKNOWN_MESSAGE,
        AVRO_DESERIALIZATION_ERROR
    }

    private ErrorType type;
    private String message;
    private String schema;
    private byte[] payload;

    public AvroSerdeError(
        final ErrorType type,
        final String message,
        final String schema,
        final byte[] payload
    ) {
        this.type = type;
        this.message = message;
        this.schema = schema;
        this.payload = payload;
    }
}

Now is time to refactor our AvroDeserializer class.

public class AvroDeserializer extends KafkaAvroDeserializer {

    private static final String MAGIC_BYTE = "Unknown magic byte!";

    @Override
    public Object deserialize(final String s, final byte[] bytes) {
        try {
            return super.deserialize(s, bytes);
        } catch (final SerializationException exception) {
            return toError(exception, bytes);
        }
    }
}

We added toError method, which is responsible to create AvroSerdeError based on the cause of the SerializationException.

private AvroSerdeError toError(final SerializationException exception, final byte[] payload) {
    final String message = exception.getCause().getMessage();
    
    if (message.contains("Unknown magic byte!")) {
        return new AvroSerdeError(UNKNOWN_MESSAGE, message, payload);
    }
    
    final String schema = extractSchema(message);
    
    if (schema == null) {
        return new AvroSerdeError(
            AVRO_DESERIALIZATION_ERROR,
            "Can't find Avro Schema!",
            payload
        );
    }
    
    return new AvroSerdeError(AVRO_DESERIALIZATION_ERROR, message, schema, payload);
}

The first case is when the message is not in Avro format at all, this is an edge case and it's basically impossible to happen if your Kafka Cluster is secured properly. Of course, the one who has the access can produce a raw message.

For the second one, I'm not sure if it can really happen, but the extractSchema method can return null, so I was forced to handle this case.

private String extractSchema(final String message) {
    final String[] parts = message.split(" ");
    try {
        return parts[4];
    } catch (final ArrayIndexOutOfBoundsException exception) {
        return null;
    }
}

Message is always in this format:

Could not find class {full.namespace.className} specified in writer's schema whilst finding reader's schema for a SpecificRecord.

This means that your application doesn't have a generated Java class that corresponds to the Avro schema. The full schema name from exception can be extracted easily. This will be the most common case that you can encounter while working with Kafka and Avro.

Let's upgrade our AvroKafkaConsumer with dead letter producer:

public class AvroKafkaConsumer<K, V> {

    private final KafkaProducer<byte[], byte[]> deadLetterProducer;
    protected KafkaConsumer<K, V> consumer; // initialize consumer and producer }

    private ConsumerRecords<K, V> filterOutAndReportCorruptedRecords(
        final ConsumerRecords<K, V> records
    ) {
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> filteredBatch = new HashMap<>();
        records.forEach(record -> {
            final V value = record.value();
            if (value.getClass().equals(AvroSerdeError.class)) {
                final AvroSerdeError error = (AvroSerdeError) value;
                logAndPublishToDeadLetter(
                    error,
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.timestamp()
                );
            } else {
                final TopicPartition topicPartition = new TopicPartition(
                    record.topic(),
                    record.partition()
                );
                final List<ConsumerRecord<K, V>> result = filteredBatch.getOrDefault(
                    topicPartition,
                    new ArrayList<>()
                );
                result.add(record);
                filteredBatch.put(topicPartition, result);
            }
        });
        
        return new ConsumerRecords<>(filteredBatch);
    }

This implementation gives us more information about the error, so you can log an error message or publish a bad record to the dead letter topic for further analysis. Default implementation for logAndPublishToDeadLetter is provided, but it can be overridden for specific needs.

protected void logAndPublishToDeadLetter(
    final AvroSerdeError error,
    final String topic,
    final int partition,
    final long offset,
    final long timestamp
) {
    final ErrorType errorType = error.type();
    
    // Message is not Avro, and can't be deserialzied
    if (errorType == UNKNOWN_MESSAGE) {
        deadLetterProducer.send(
            new ProducerRecord<>(deadLetterTopic, error.payload()),
            (metadata, exception) -> logger().error(
                "[{}, topic: {}, partition: {}, offset: {}, timestamp: {}] -{}", 
                errorType, topic, partition, offset, timestamp, error.message()
            )
        );
    }

    // Message is Avro, but can't be deserialized
    if (errorType == AVRO_DESERIALIZATION_ERROR) {
        // decide what do you want to do
    }  
}

Finally, we made a consumer that is resilient from deserialization exceptions, and have decent logging and error reporting.

Want more control of errors?

The previous solution works pretty well and I don't see a case where it could lead to any major problems, but I do want to mention an alternative approach that I personally like even more. To understand this chapter, some basic knowledge of Either, Try or Option data types will be useful. If you have no idea what I'm talking about, check out this blog post, which explains them in detail. We will utilize Either data type from the Vavr library, which helps us to incorporate some basic concepts of functional programming into Java.

Either represents a value of two possible types. Either is either instance of a Left or a Right. Both Left and Right can wrap any type. By convention Right is right, so it contains the successful result, and the Left side should contain the error. This is not mandatory but you should stick to the convention.

Our consumer methods should look like this now:

public List<Either<AvroSerdeError, V>> poll() {
    return processBatch(consumer.poll(Duration.ofMillis(10)));
}

private List<Either<AvroSerdeError, V>> processBatch(final ConsumerRecords<K, V> records) {
    final List<Either<AvroSerdeError, V>> result = new ArrayList<>();
    
    records.forEach(record -> {
        final V value = record.value();
        if (value.getClass().equals(AvroSerdeError.class)) {
            final AvroSerdeError error = (AvroSerdeError) value;
            result.add(Either.left(error));
        } else {
            result.add(Either.right(value));
        }
    });
    
    return result;
}

Now, the end-user of our custom consumer, when pooling for records will get a List of Eithers and will be forced to finally deal with the error and how to process correct messages. An error can be ignored, logged, or published to a dead letter topic, as described earlier.

Here is a simple example of how to use a custom consumer.

void loop() {
    poll().forEach(record -> {
        if (record.isLeft()) {
            processError(record.getLeft());
        } else {
            processValue(record.get());
        }
    });
}

void processValue(final V value) {
    // decide how to process successful record
}

void processError(final AvroSerdeError error) {
    // decide how to process error
}

Summary

We learned how to deal with the bad records in Kafka when using default API with the Avro data format. If you don't want to deal with these types of errors, Spring Kafka has an out of the box solution for you. Check this blog for a more detailed explanation. Another solution could be to use consumers that will consume bytes directly from Kafka and manually try to do conversions between raw record and Avro or any other data format.

Every solution has some advantages and some drawbacks. We found a perfect match with our custom library that wraps the default Kafka client and gives us the ability to quickly integrate Kafka consumers/producers into any Java or Spring Boot based application.

Another Kafka client that is worth mentioning is definitely Akka based, called Alpakka Kafka, which gives us nice Java and Scala DSL to deal with the Kafka in a reactive manner. Besides this one, there is SmallRye Reactive Messaging Kafka module, which also exposes Kafka  API in a reactive manner. This one is used as a default Kafka client for Quarkus.