Kafka 根據指定時間消費資料

Kafka 根據指定時間消費資料

1

背景

在kafka的實際應用過程中,由於資料處理問題,需要對kafka中的資料進行重新消費。重新消費資料一般都是使用一個新的groupId,但預設的配置是earliest(當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 ),latest (

當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 )。

如果Kafka儲存的資料量較小,透過earliest方式影響也不會太大,但當資料量比較大時,最好是能進行部分資料消費處理,一是提升處理問題的速度,二是減少資源浪費。

本次先透過指定時間的方式來消費訊息。

2

實現

Kafka 整個訊息設計是非常的精妙,本案例是指定今天的凌晨為訊息的開始時間

KafkaConsumer consumer =newKafkaConsumer(props);

Stringtopic =“”;

//獲取topic的partition資訊

List partitionInfos = consumer。partitionsFor(topic);

List topicPartitions =newArrayList();

Map timestampsToSearch =newHashMap();

longfetchDataTime =LocalDate。now()。atStartOfDay(ZoneId。systemDefault())。toInstant()。toEpochMilli();

for(PartitionInfopartitionInfo : partitionInfos) {

topicPartitions。add(newTopicPartition(partitionInfo。topic(), partitionInfo。partition()));

timestampsToSearch。put(newTopicPartition(partitionInfo。topic(), partitionInfo。partition()), fetchDataTime);

}

consumer。assign(topicPartitions);

//獲取每個partition今天凌晨的偏移量

Map map = consumer。offsetsForTimes(timestampsToSearch);

OffsetAndTimestampoffsetTimestamp =null;

System。out。println(“開始設定各分割槽初始偏移量。。。”);

for(Map。Entry entry : map。entrySet()) {

//如果設定的查詢偏移量的時間點大於最大的索引記錄時間,那麼value就為空

offsetTimestamp = entry。getValue();

if(offsetTimestamp !=null) {

intpartition = entry。getKey()。partition();

longtimestamp = offsetTimestamp。timestamp();

longoffset = offsetTimestamp。offset();

System。out。println(“partition = ”+ partition +

“, time = ”+ df。format(newDate(timestamp)) +

“, offset = ”+ offset);

//設定讀取訊息的偏移量

consumer。seek(entry。getKey(), offset);

}

}

while(true) {

ConsumerRecords records = consumer。poll(100);

for(ConsumerRecord record : records) {

Stringvalue = record。value();

JSONObjectjsonObject =JSON。parseObject(value);

}

}

3

程式碼分析

Kafka 的消費位置是透過offset控制,實現方案是先根據時間獲取對應的offset位置,然後進行消費。

如果現在需要檢視某個分割槽,某個位置的資料應該如何實現呢?實際上實現方式與上面類似, 下篇文章在貼程式碼