2020-05-15 10:25發布
遇到這種問題,基本上是心跳或offset更新不及時導致。
在kafka環境中,有以下幾個參數對于數據重復有很好的效果。
auto.commit.interval.ms
consumer向zookeeper提交offset的頻率,單位是秒,默認60*1000
此值太大會導致數據重復消費,將其調小可避免重復數據。建議值100(毫秒)
max.poll.interval.ms
數據處理時間
max.poll.records
一次從kafka中poll出來的數據條數
max.poll.records條數據需要在max.poll.interval.ms這個時間內處理完
session.timeout.ms
zookeeper 會話的超時限制。如果consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認為掛掉了,并且reblance將會產生。默認6000
request.timeout.ms
broker盡力實現request.required.acks需求時的等待時間,否則會發送錯誤到客戶端,默認10000
request.timeout.ms值應大于session.timeout.ms的值
fetch.min.bytes
每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。默認1
————————————————
版權聲明:本文為CSDN博主「handu940955668」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/handu940955668/java/article/details/85010773
丟包問題:消息推送服務,每天早上,手機上各終端都會給用戶推送消息,這時候流量劇增,可能會出現kafka發送數據過快,導致服務器網卡爆滿,或者磁盤處于繁忙狀態,可能會出現丟包現象。?解決方案:首先對kafka進行限速, 其次啟用重試機制,重試間隔時間設置長一些,最后Kafka設置acks=all,即需要相應的所有處于ISR的分區都確認收到該消息后,才算發送成功。?檢測方法:使用重放機制,查看問題所在。?kafka配置如下:
? ? ? ? ?props.put("compression.type", "gzip");? ? ? ? ?props.put("linger.ms", "50");? ? ? ? ?props.put("acks", "all");? ? ? ? ?props.put("retries ", 30);? ? ? ? ?props.put("reconnect.backoff.ms ", 20000);? ? ? ? ?props.put("retry.backoff.ms", 20000);
重發問題:當消費者重新分配partition的時候,可能出現從頭開始消費的情況,導致重發問題。當消費者消費的速度很慢的時候,可能在一個session周期內還未完成,導致心跳機制檢測報告出問題。?底層根本原因:已經消費了數據,但是offset沒提交。?配置問題:設置了offset自動提交?解決辦法:至少發一次+去重操作(冪等性)?問題場景:1.設置offset為自動提交,正在消費數據,kill消費者線程;2.設置offset為自動提交,關閉kafka時,如果在close之前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費;3.消費kafka與業務邏輯在一個線程中處理,可能出現消費程序業務處理邏輯阻塞超時,導致一個周期內,offset還未提交;繼而重復消費,但是業務邏輯可能采用發送kafka或者其他無法回滾的方式;?重復消費最常見的原因:re-balance問題,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那么就會re-balance重平衡,此時有一定幾率offset沒提交,會導致重平衡后重復消費。?去重問題:消息可以使用唯一id標識?保證不丟失消息:生產者(ack=all 代表至少成功發送一次)?消費者 (offset手動提交,業務邏輯成功處理后,提交offset)?保證不重復消費:落表(主鍵或者唯一索引的方式,避免重復數據)?業務邏輯處理(選擇唯一主鍵存儲到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進行業務邏輯處理)
問題根源在于Kafka的平衡機制,Kafka什么時候平衡我們無從知曉,而消費又是沒平衡好就開始消費了,所以解決也從這個角度來解決。和網友交流了下,了解到,新版本的API在平衡的時候可以注冊一個對象,在平衡前和后可以調用這個對象的方法,我們在這個方法里面將此topic的stream提交(這可能會造成數據丟失,因為這些數據很可能還沒處理),這個新API測試了下,基本沒什么問題。高級API如何解決?用類分布式鎖最終解決了這個問題,實現思路比較簡單,就是通過ZK來實現,程序啟動前先定義好需要啟動的消費者數量,如果還沒達到這個量,線程都不能啟動,達到這個線程數后,休眠幾秒后啟動,在啟動的時候,消費者線程已經得到了平衡,除非線程死掉否則不會發生平衡了,所以暫時解決了這個問題。
自己維護offset提交,任務正常執行再提交
flume和kafka的側重點不同,flume追求的是數據和數據源、數據流向的多樣性,適合多個生產者的場景;flume有自己內置的多種source和sink組件,具體操作方式是編寫source、channel和sink的.conf配置文件,開啟flume組件的時候用命令關聯讀取配置文件實現kafka追...
探究的是kafka的數據生產出來之后究竟落到了哪一個分區里面去了第一種分區策略:給定了分區號,直接將數據發送到指定的分區里面去第二種分區策略:沒有給定分區號,給定數據的key值,通過key取上hashCode進行分區第三種分區策略:既沒有給定分區號,也沒有給...
讀取數據的過程中,數據是屬于某一個topic的某一個partition對應的某一個segment文件中的某一條記錄。①定位到具體的segment日志文件②計算查找的offset在日志文件的相對偏移量
Spark Streaming的輸出一般是靠foreachRDD()算子來實現,它默認是at least once的。如果輸出過程中途出錯,那么就會重復執行直到寫入成功。為了讓它符合exactly once,可以施加兩種限制之一:冪等性寫入(idempotent write)、事務性寫入(transactional write...
消息丟失解決方案:????????首先對kafka進行限速, 其次啟用重試機制,重試間隔時間設置長一些,最后Kafka設置acks=all,即需要相應的所有處于ISR的分區都確認收到該消息后,才算發送成功。消息重復解決方案:????????消息可以使用唯一id標識?...
kafka數據推送失敗,如果是生產者推送kafka數據失敗,有可能是網絡端口問題,配置網絡ip的域名映射,防火墻問題可以做防火墻策略,還有配置的ip和端口是否正確,還要確定kafka是否啟動服務。kafka推送數據到其他系統,可以使用flume以及消費者來做...
數據一致性可以理解為數據前后是否一樣,比如說數據的丟失或者重復都回造成數據的不一致!kafka的0.11之后引入了冪等性可以解決單次會話、單個partition的數據不重復,生產者可以利用其0.11后引入的事務來保證其數據一致性,消費者的話其實事務是無法保證的,...
需要明白kafka的底層機制及工作原理,這里只簡要說明,詳細的參考kafka官網。kafka是將每一條寫入kafka的數據按分區分布存儲,將每條寫入的數據作一個offset標記,這個標記的順序是按插入數據自增的。當消費程序的時候,會按照分區區分,逐個根據offset順序消費。...
最多設置5個標簽!
遇到這種問題,基本上是心跳或offset更新不及時導致。
在kafka環境中,有以下幾個參數對于數據重復有很好的效果。
auto.commit.interval.ms
consumer向zookeeper提交offset的頻率,單位是秒,默認60*1000
此值太大會導致數據重復消費,將其調小可避免重復數據。建議值100(毫秒)
max.poll.interval.ms
數據處理時間
max.poll.records
一次從kafka中poll出來的數據條數
max.poll.records條數據需要在max.poll.interval.ms這個時間內處理完
session.timeout.ms
zookeeper 會話的超時限制。如果consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認為掛掉了,并且reblance將會產生。默認6000
request.timeout.ms
broker盡力實現request.required.acks需求時的等待時間,否則會發送錯誤到客戶端,默認10000
request.timeout.ms值應大于session.timeout.ms的值
fetch.min.bytes
每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。默認1
————————————————
版權聲明:本文為CSDN博主「handu940955668」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/handu940955668/java/article/details/85010773
遇到這種問題,基本上是心跳或offset更新不及時導致。
在kafka環境中,有以下幾個參數對于數據重復有很好的效果。
auto.commit.interval.ms
consumer向zookeeper提交offset的頻率,單位是秒,默認60*1000
此值太大會導致數據重復消費,將其調小可避免重復數據。建議值100(毫秒)
max.poll.interval.ms
丟包問題:消息推送服務,每天早上,手機上各終端都會給用戶推送消息,這時候流量劇增,可能會出現kafka發送數據過快,導致服務器網卡爆滿,或者磁盤處于繁忙狀態,可能會出現丟包現象。?
解決方案:首先對kafka進行限速, 其次啟用重試機制,重試間隔時間設置長一些,最后Kafka設置acks=all,即需要相應的所有處于ISR的分區都確認收到該消息后,才算發送成功。?
檢測方法:使用重放機制,查看問題所在。?
kafka配置如下:
? ? ? ? ?props.put("compression.type", "gzip");
? ? ? ? ?props.put("linger.ms", "50");
? ? ? ? ?props.put("acks", "all");
? ? ? ? ?props.put("retries ", 30);
? ? ? ? ?props.put("reconnect.backoff.ms ", 20000);
? ? ? ? ?props.put("retry.backoff.ms", 20000);
重發問題:當消費者重新分配partition的時候,可能出現從頭開始消費的情況,導致重發問題。當消費者消費的速度很慢的時候,可能在一個session周期內還未完成,導致心跳機制檢測報告出問題。?
底層根本原因:已經消費了數據,但是offset沒提交。?
配置問題:設置了offset自動提交?
解決辦法:至少發一次+去重操作(冪等性)?
問題場景:1.設置offset為自動提交,正在消費數據,kill消費者線程;2.設置offset為自動提交,關閉kafka時,如果在close之前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費;3.消費kafka與業務邏輯在一個線程中處理,可能出現消費程序業務處理邏輯阻塞超時,導致一個周期內,offset還未提交;繼而重復消費,但是業務邏輯可能采用發送kafka或者其他無法回滾的方式;?
重復消費最常見的原因:re-balance問題,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那么就會re-balance重平衡,此時有一定幾率offset沒提交,會導致重平衡后重復消費。?
去重問題:消息可以使用唯一id標識?
保證不丟失消息:生產者(ack=all 代表至少成功發送一次)?
消費者 (offset手動提交,業務邏輯成功處理后,提交offset)?
保證不重復消費:落表(主鍵或者唯一索引的方式,避免重復數據)?
業務邏輯處理(選擇唯一主鍵存儲到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進行業務邏輯處理)
問題根源在于Kafka的平衡機制,Kafka什么時候平衡我們無從知曉,而消費又是沒平衡好就開始消費了,所以解決也從這個角度來解決。
和網友交流了下,了解到,新版本的API在平衡的時候可以注冊一個對象,在平衡前和后可以調用這個對象的方法,我們在這個方法里面將此topic的stream提交(這可能會造成數據丟失,因為這些數據很可能還沒處理),這個新API測試了下,基本沒什么問題。
高級API如何解決?用類分布式鎖最終解決了這個問題,實現思路比較簡單,就是通過ZK來實現,程序啟動前先定義好需要啟動的消費者數量,如果還沒達到這個量,線程都不能啟動,達到這個線程數后,休眠幾秒后啟動,在啟動的時候,消費者線程已經得到了平衡,除非線程死掉否則不會發生平衡了,所以暫時解決了這個問題。
自己維護offset提交,任務正常執行再提交
相關問題推薦
flume和kafka的側重點不同,flume追求的是數據和數據源、數據流向的多樣性,適合多個生產者的場景;flume有自己內置的多種source和sink組件,具體操作方式是編寫source、channel和sink的.conf配置文件,開啟flume組件的時候用命令關聯讀取配置文件實現kafka追...
探究的是kafka的數據生產出來之后究竟落到了哪一個分區里面去了第一種分區策略:給定了分區號,直接將數據發送到指定的分區里面去第二種分區策略:沒有給定分區號,給定數據的key值,通過key取上hashCode進行分區第三種分區策略:既沒有給定分區號,也沒有給...
讀取數據的過程中,數據是屬于某一個topic的某一個partition對應的某一個segment文件中的某一條記錄。①定位到具體的segment日志文件②計算查找的offset在日志文件的相對偏移量
Spark Streaming的輸出一般是靠foreachRDD()算子來實現,它默認是at least once的。如果輸出過程中途出錯,那么就會重復執行直到寫入成功。為了讓它符合exactly once,可以施加兩種限制之一:冪等性寫入(idempotent write)、事務性寫入(transactional write...
消息丟失解決方案:????????首先對kafka進行限速, 其次啟用重試機制,重試間隔時間設置長一些,最后Kafka設置acks=all,即需要相應的所有處于ISR的分區都確認收到該消息后,才算發送成功。消息重復解決方案:????????消息可以使用唯一id標識?...
kafka數據推送失敗,如果是生產者推送kafka數據失敗,有可能是網絡端口問題,配置網絡ip的域名映射,防火墻問題可以做防火墻策略,還有配置的ip和端口是否正確,還要確定kafka是否啟動服務。kafka推送數據到其他系統,可以使用flume以及消費者來做...
數據一致性可以理解為數據前后是否一樣,比如說數據的丟失或者重復都回造成數據的不一致!kafka的0.11之后引入了冪等性可以解決單次會話、單個partition的數據不重復,生產者可以利用其0.11后引入的事務來保證其數據一致性,消費者的話其實事務是無法保證的,...
需要明白kafka的底層機制及工作原理,這里只簡要說明,詳細的參考kafka官網。kafka是將每一條寫入kafka的數據按分區分布存儲,將每條寫入的數據作一個offset標記,這個標記的順序是按插入數據自增的。當消費程序的時候,會按照分區區分,逐個根據offset順序消費。...