教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Kafka中的HW、LEO等分別代表什么?

更新時間:2023年10月19日11時37分 來源:傳智教育 瀏覽次數(shù):

好口碑IT培訓(xùn)

  在Apache Kafka中,HW(High Watermark)和 LEO(Log End Offset)是與分區(qū)的復(fù)制和消息傳遞相關(guān)的兩個關(guān)鍵概念。

  1.HW(High Watermark):

  High Watermark是一個分區(qū)的消息復(fù)制進度的指示器。它表示了已經(jīng)成功復(fù)制到所有副本的消息的位置。HW之前的所有消息都被認為是已提交的消息,這意味著消費者可以安全地消費這些消息。HW通常是消費者組維護的偏移量的參考點。

  2.LEO(Log End Offset):

  Log End Offset表示一個分區(qū)中消息日志的最后一個位置,即下一條消息要寫入的位置。LEO是動態(tài)變化的,因為消息不斷被追加到分區(qū)。它表示了分區(qū)中的最新消息位置。

  接下來筆者用一段具體的示例代碼,來演示下如何使用Java和Kafka Consumer API來獲取分區(qū)的HW和LEO:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaHWLEOExample {
    public static void main(String[] args) {
        // 設(shè)置Kafka消費者的配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 創(chuàng)建Kafka消費者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 指定要訂閱的主題
        String topic = "my-topic";
        consumer.subscribe(Collections.singletonList(topic));

        // 獲取分區(qū)信息
        PartitionInfo partitionInfo = consumer.partitionsFor(topic).get(0);
        int partition = partitionInfo.partition();

        // 在消費者循環(huán)中獲取HW和LEO
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (TopicPartition topicPartition : records.partitions()) {
                long hw = consumer.position(topicPartition); // 獲取HW
                long leo = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition); // 獲取LEO
                System.out.println("Partition " + topicPartition.partition() + ": HW = " + hw + ", LEO = " + leo);
            }
        }
    }
}

  上面的代碼創(chuàng)建了一個Kafka消費者,并訂閱了一個主題。在消費者循環(huán)中,我們使用position()方法來獲取分區(qū)的HW,并使用endOffsets()方法來獲取分區(qū)的LEO。這可以幫助我們監(jiān)視分區(qū)的消息復(fù)制進度和消息日志的結(jié)束位置。

0 分享到:
和我們在線交談!