DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Low-Code Development: Leverage low and no code to streamline your workflow so that you can focus on higher priorities.

DZone Security Research: Tell us your top security strategies in 2024, influence our research, and enter for a chance to win $!

Launch your software development career: Dive head first into the SDLC and learn how to build high-quality software and teams.

Open Source Migration Practices and Patterns: Explore key traits of migrating open-source software and its impact on software development.

Related

  • Capturing Acknowledgement in Kafka Streaming With RecordMetadata
  • Level up Your Streaming Skills: A Comprehensive Introduction to Redpanda for Developers
  • Private DNS Zone With Azure HDInsight Kafka
  • Unlocking the Potential of IoT Applications With Real-Time Alerting Using Apache Kafka Data Streams and KSQL

Trending

  • Front-End Application Performance Monitoring (APM)
  • Performance and Scalability Analysis of Redis and Memcached
  • PostgreSQL BiDirectional Replication
  • Partitioning Hot and Cold Data Tier in Apache Kafka Cluster for Optimal Performance
  1. DZone
  2. Data Engineering
  3. Big Data
  4. How To Fix Kafka Consumers Due To Corrupted Log Files or Missing Log Files

How To Fix Kafka Consumers Due To Corrupted Log Files or Missing Log Files

This article explores recovering Kafka Brokers from corrupted data storage and recovering Kafka consumers due to missing log files.

By 
Ramu kakarla user avatar
Ramu kakarla
·
May. 02, 24 · Tutorial
Like (1)
Save
Tweet
Share
1.3K Views

Join the DZone community and get the full member experience.

Join For Free

Red Hat AMQ streams is a massively scalable, distributed, and high-performance data streaming platform based on the Apache Kafka project. It offers a distributed backbone that allows microservices and other applications to share data with high throughput and low latency.

Red Hat AMQ streams deployed on open shift with persistence storage. Spring Kafka clients deployed as producers and consumers.

The Kafka consumer part of the consumer group suddenly stopped consuming messages.

Kafka Broker Logs 

Java
 
Kafka broker-0
ERROR [ReplicaManager broker=0] Error processing append operation on partition __consumer_offsets-13 (kafka.server.ReplicaManager) [data-plane-kafka-request-handler-2]
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-13
INFO [GroupCoordinator 0]: Preparing to rebalance group group-id in state PreparingRebalance with old generation 118964 (__consumer_offsets-13) (reason: Error COORDINATOR_NOT_AVAILABLE when storing group assignment during SyncGroup (member: consumer-group-id-x-aaa-bbb-ccc)) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-2]
Kafka broker-2
ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-13 at offset xxxxx (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-13 at offset xxxxx (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.


This happens when some partitions from the _consumer_offsets have less than two min in-sync replicas, in this case, consumers can not commit offsets. This is what we observed on the above broker-0 logs. One can describe the topic to check min in-sync replicas. For example:         

Java
 

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic __consumer_offsets-13
 Topic: __consumer_offsets	Partition: 13	Leader: 0	Replicas: 0,1,2	Isr: 0 <--------- problematic one
 Topic: __consumer_offsets	Partition: 14	Leader: 2	Replicas: 2,0,1	Isr: 2,1,0
 Topic: __consumer_offsets	Partition: 15	Leader: 1	Replicas: 1,0,2	Isr: 1,2,0 
  
  
  


  • The logs from broker-2 logs indicate why broker-2 is not able to get in sync for that partition and is not able to fetch offset xxxxx from __consumer_offset_13 on broker-0. 
  • One should inspect/check what happened to that record using kafka-dump-log.sh tool to print that record. For example:
Java
 
bin/kafka-dump-log.sh --files data/kafka/__consumer_offsets-13/000000000000000xxx.log --value-decoder-class "kafka.serializer.StringDecoder" --offsets-decoder
----------------------------------------------------------------------------
---------------------------------------------------------------------------
-------------------------------------------------------------------------------
baseOffset: xxxxx lastOffset: xxxxx count: 0 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 24 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: yyyyyyy CreateTime: 111111111111 size: 137 magic: 2 compresscodec: none crc: 2xxxxxxx isvalid: false
  
org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /home/kkakarla/Downloads/__consumer_offsets-13/0000000000000xxx.log.  


  • Looking at the above, it is clearly evident that the record is corrupted. Upon checking the entire __consumer_offset_13 from broker-0. we found some .log files missing.
  • One can copy the __consumer_offset_13 from Kafka broker-0 pod to local using the below command.
Java
 
oc rsync my-cluster-kafka-0:/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-13     tmp/log/__consumer_offsets-13


  • From the logs, we observed the below error messages due to missing .log files from __consumer_offsets_13
org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on 
the disk.
ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition SQL_xxx-10 at offset 12345 (kafka.server.ReplicaFetcherThread) [ReplicaFetcherThread-0-0]
org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on 
the disk.


  • We observed the below read-only error on Kafka storage mount volumes.
Java
 
message: 'relabel failed /var/lib/kubelet/pods/xxxxxyyyyyzzzz/volumes/kubernetes.io~csi/pvc-abcdef/mount:
          lsetxattr /var/lib/kubelet/pods/xxxxxyyyyyzzzz/volumes/kubernetes.io~csi/pvc-abcdef/mount/kafka-log0:
          read-only file system'


  • It is possible that the read-only issue prevented the creation of files that should have been created. As a result, the file may have been seemingly lost and corrupted.

Now the question is how one can fix the corrupted __consumer_offset_13 from Kafka broker-0.

If other Kafka brokers Kafka-broker-1 and Kafka-broker-2 have log files in good condition. There is a way to restore __consumer-offset-13 using logs of Kafka-broker-1 or Kafka-broker-2

  • Stopping all the spring consumer applications
  • Configuring unclean.leader.election.enable=true in the Kafka CR yaml (oc edit Kafka -o yaml)
  • unclean.leader.election.enable=true is the configuration to fix __consumer-offset-13 in Kafka-0 using the Kafka-1 and Kafka-2 log files.
  • unclean.leader.election.enable (default: false)
  • Indicates whether to enable replicas not in the ISR set to be elected as a leader as a last resort, even though doing so may result in data loss
Java
 
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
...
spec:
  kafka:
    ...
    config:
      unclean.leader.election.enable: true


  • Wait for about three minutes, checking whether unclean.leader.election.enable is set to true for each of the Kafka brokers, using the following command
  • Please adjust the {BROKER_ID} for each broker id(0, 1, 2) and execute repeatedly.
Java
 
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name ${BROKER_ID} --describe --all | grep unclean.leader.election.enable


  • It is possible that the Strimzi cluster operator does not set unclean.leader.election.enable=true due to a lack of ISRs
  • If unclean.leader.election.enable=true could not be set by the strimzi cluster operator], we should set it manually. Check again after executing the following commands. please adjust the {BROKER_ID} for each broker id(0, 1, 2) and execute repeatedly.
Java
 
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name ${BROKER_ID} --alter --add-config unclean.leader.election.enable=true


  • Just to be sure, checking that the kafka-0 pod can be restarted using "oc delete pod <kafka-0 pod name>"
  •  If the broker does not start here related to other than __consumer-offset-13, there may be other problems.
  • Logging in the kafka-0 pod, using "oc debug <kafka-0 pod name>"
  •  After logging in the kafka-0 pod, deleting the __consumer-offset-13(/var/lib/kafka/data-<JBOD_ID>/kafka-log<BROKER_ID>/__consumer_offsets-13) directory in the kafka-0 pod.
  •  Logging off from the kafka-0 pod
  •  Restarting kafka-0 pod, using "oc delete pod <kafka-0 pod name>"
  • Wait for about 3 minutes
  •  If everything goes smooth then __consumer-offset-13 of Kafka-0 may recover from kafka-1 and kafka-2 logs automatically at this point.

If the above solution fails for some reason, then one can try the below workaround:

  • The safe workaround to avoid the use of __consumer-offsets-13 by changing the consumer group name(group.xxx) of the client spring application.
  • The partition number of __consumer_offsets is determined from the hash value of the consumer group name(group.xxx) and the max number of __consumer_offsets partition.
  • For your information, the following is a small Java program to identify the partition number of the __consumer_offsets. If you use group-xxx it will be 13. 
  • The partition number of your new consumer group name should not be 13. For example, if you use group-zzz as the new consumer group name, the partition number should be some other integer no.
  • One can extract a list of offsets of the consumer group(group.id=group-xxx) from the log files of Kafka-broker 1 and Kafka-broker 2

This is the Java program. to find the partition number of the __consumer_offsets 

Java
 
    public static void main(String[] args) {

      int maxConsumerOffsetsTopicPartition = 50; // __consumer_offsets topic partition num: default 50

      String groupId = "group-xxx"; // corrupted consumer group

      int consumerOffsetsPartitionNum = Math.abs(groupId.hashCode()) % maxConsumerOffsetsTopicPartition;

      System.out.println(consumerOffsetsPartitionNum);

    }


 If one would like to reset the offset for that new consumer group, please use the following commands.

Java
 
# reset to earliest

oc exec -it ${KAFKA_CLUSTER}-kafka-0 -c kafka -- env - bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-earliest --group ${CONSUMER_GROUP_NAME} --topic ${TOPIC_NAME} --execute



# reset to latest

oc exec -it ${KAFKA_CLUSTER}-kafka-0 -c kafka -- env - bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-latest --group ${CONSUMER_GROUP_NAME} --topic ${TOPIC_NAME} --execute



# reset to specific offset

oc exec -it ${KAFKA_CLUSTER}-kafka-0 -c kafka -- env - bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-offset <OFFSET_VALUE> --group ${CONSUMER_GROUP_NAME} --topic ${TOPIC_NAME}:<PARTITION_NUMBERS> --execute


After changing the consumer group name in the above workaround, one should delete the __consumer-offsets-13 log files manually for cleanup.

Follow the steps mentioned in the section "How one can fix the corrupted __consumer_offset_13 from Kafka broker-0."

kafka Data recovery Data stream

Opinions expressed by DZone contributors are their own.

Related

  • Capturing Acknowledgement in Kafka Streaming With RecordMetadata
  • Level up Your Streaming Skills: A Comprehensive Introduction to Redpanda for Developers
  • Private DNS Zone With Azure HDInsight Kafka
  • Unlocking the Potential of IoT Applications With Real-Time Alerting Using Apache Kafka Data Streams and KSQL

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: