1
背景
在kafka的實際應用過程中,由於資料處理問題,需要對kafka中的資料進行重新消費。重新消費資料一般都是使用一個新的groupId,但預設的配置是earliest(當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 ),latest (
當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 )。
如果Kafka儲存的資料量較小,透過earliest方式影響也不會太大,但當資料量比較大時,最好是能進行部分資料消費處理,一是提升處理問題的速度,二是減少資源浪費。
本次先透過指定時間的方式來消費訊息。
2
實現
Kafka 整個訊息設計是非常的精妙,本案例是指定今天的凌晨為訊息的開始時間
KafkaConsumer consumer =newKafkaConsumer(props);
Stringtopic =“”;
//獲取topic的partition資訊
List partitionInfos = consumer。partitionsFor(topic);
List topicPartitions =newArrayList();
Map timestampsToSearch =newHashMap();
longfetchDataTime =LocalDate。now()。atStartOfDay(ZoneId。systemDefault())。toInstant()。toEpochMilli();
for(PartitionInfopartitionInfo : partitionInfos) {
topicPartitions。add(newTopicPartition(partitionInfo。topic(), partitionInfo。partition()));
timestampsToSearch。put(newTopicPartition(partitionInfo。topic(), partitionInfo。partition()), fetchDataTime);
}
consumer。assign(topicPartitions);
//獲取每個partition今天凌晨的偏移量
Map map = consumer。offsetsForTimes(timestampsToSearch);
OffsetAndTimestampoffsetTimestamp =null;
System。out。println(“開始設定各分割槽初始偏移量。。。”);
for(Map。Entry entry : map。entrySet()) {
//如果設定的查詢偏移量的時間點大於最大的索引記錄時間,那麼value就為空
offsetTimestamp = entry。getValue();
if(offsetTimestamp !=null) {
intpartition = entry。getKey()。partition();
longtimestamp = offsetTimestamp。timestamp();
longoffset = offsetTimestamp。offset();
System。out。println(“partition = ”+ partition +
“, time = ”+ df。format(newDate(timestamp)) +
“, offset = ”+ offset);
//設定讀取訊息的偏移量
consumer。seek(entry。getKey(), offset);
}
}
while(true) {
ConsumerRecords records = consumer。poll(100);
for(ConsumerRecord record : records) {
Stringvalue = record。value();
JSONObjectjsonObject =JSON。parseObject(value);
}
}
3
程式碼分析
Kafka 的消費位置是透過offset控制,實現方案是先根據時間獲取對應的offset位置,然後進行消費。
如果現在需要檢視某個分割槽,某個位置的資料應該如何實現呢?實際上實現方式與上面類似, 下篇文章在貼程式碼