女人自慰AV免费观看内涵网,日韩国产剧情在线观看网址,神马电影网特片网,最新一级电影欧美,在线观看亚洲欧美日韩,黄色视频在线播放免费观看,ABO涨奶期羡澄,第一导航fulione,美女主播操b

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

超詳細(xì)“零”基礎(chǔ)kafka入門篇

馬哥Linux運(yùn)維 ? 來(lái)源:馬哥Linux運(yùn)維 ? 2024-12-18 09:50 ? 次閱讀

1、認(rèn)識(shí)kafka

1.1 kafka簡(jiǎn)介

Kafka是一個(gè)分布式流媒體平臺(tái)

kafka官網(wǎng):http://kafka.apache.org/

(1)流媒體平臺(tái)有三個(gè)關(guān)鍵功能:

發(fā)布和訂閱記錄流,類似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。

容錯(cuò)的持久方式存儲(chǔ)記錄流

記錄發(fā)生時(shí)處理流。

(2)Kafka通常用于兩大類應(yīng)用:

構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時(shí)流數(shù)據(jù)管道

構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實(shí)時(shí)流應(yīng)用程序

要了解Kafka如何做這些事情,讓我們深入探討Kafka的能力。

(3)首先是幾個(gè)概念:

Kafka作為一個(gè)集群運(yùn)行在一個(gè)或多個(gè)可跨多個(gè)數(shù)據(jù)中心的服務(wù)器上。

Kafka集群以稱為topics主題的類別存儲(chǔ)記錄流。

每條記錄都包含一個(gè)鍵,一個(gè)值和一個(gè)時(shí)間戳

(4)Kafka有四個(gè)核心API:

Producer API(生產(chǎn)者API)允許應(yīng)用程序發(fā)布記錄流至一個(gè)或多個(gè)kafka的topics(主題)

Consumer API(消費(fèi)者API)允許應(yīng)用程序訂閱一個(gè)或多個(gè)topics(主題),并處理所產(chǎn)生的對(duì)他們記錄的數(shù)據(jù)流。

Streams API(流API)允許應(yīng)用程序充當(dāng)處理器,從一個(gè)或多個(gè)topics(主題)消耗的輸入流,并產(chǎn)生一個(gè)輸出流至一個(gè)或多個(gè)輸出的topics(主題),有效地變換所述輸入流,以輸出流

Connector API(連接器API)允許構(gòu)建和運(yùn)行kafkatopics(主題)連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)中重用生產(chǎn)者或消費(fèi)者。例如,關(guān)系數(shù)據(jù)庫(kù)的連接器可能捕獲對(duì)表的每個(gè)更改。

7a42e820-b9e4-11ef-8732-92fbcf53809c.png

在Kafka中,客戶端和服務(wù)器之間的通信是通過(guò)簡(jiǎn)單,高性能,語(yǔ)言無(wú)關(guān)的TCP協(xié)議完成的。此協(xié)議已版本化并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端有多種語(yǔ)言版本。

1.2 Topics主題 和partitions分區(qū)

我們首先深入了解Kafka為記錄流提供的核心抽象-主題topics

一個(gè)Topic可以認(rèn)為是一類消息,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件

主題是發(fā)布記錄的類別或訂閱源名稱。Kafka的主題總是多用戶;也就是說(shuō),一個(gè)主題可以有零個(gè),一個(gè)或多個(gè)消費(fèi)者訂閱寫入它的數(shù)據(jù)。

對(duì)于每個(gè)主題,Kafka群集都維護(hù)一個(gè)如下所示的分區(qū)日志:

7a5888b0-b9e4-11ef-8732-92fbcf53809c.png

每個(gè)分區(qū)都是一個(gè)有序的,不可變的記錄序列,不斷附加到結(jié)構(gòu)化的提交日志中。分區(qū)中的記錄每個(gè)都分配了一個(gè)稱為偏移的順序ID號(hào),它唯一地標(biāo)識(shí)分區(qū)中的每個(gè)記錄。

Kafka集群持久保存所有已發(fā)布的記錄-無(wú)論是否已使用-使用可配置的保留期。例如,如果保留策略設(shè)置為兩天,則在發(fā)布記錄后的兩天內(nèi),它可供使用,之后將被丟棄以釋放空間。Kafka的性能在數(shù)據(jù)大小方面實(shí)際上是恒定的,因此長(zhǎng)時(shí)間存儲(chǔ)數(shù)據(jù)不是問(wèn)題。

7a742566-b9e4-11ef-8732-92fbcf53809c.png

實(shí)際上,基于每個(gè)消費(fèi)者保留的唯一元數(shù)據(jù)是該消費(fèi)者在日志中的偏移或位置。這種偏移由消費(fèi)者控制:通常消費(fèi)者在讀取記錄時(shí)會(huì)線性地提高其偏移量,但事實(shí)上,由于該位置由消費(fèi)者控制,因此它可以按照自己喜歡的任何順序消費(fèi)記錄。例如,消費(fèi)者可以重置為較舊的偏移量來(lái)重新處理過(guò)去的數(shù)據(jù),或者跳到最近的記錄并從“現(xiàn)在”開(kāi)始消費(fèi)。

這些功能組合意味著Kafka消費(fèi)者consumers非常cheap -他們可以來(lái)來(lái)往往對(duì)集群或其他消費(fèi)者沒(méi)有太大影響。例如,您可以使用我們的命令行工具“tail”任何主題的內(nèi)容,而無(wú)需更改任何現(xiàn)有使用者所消耗的內(nèi)容。

日志中的分區(qū)有多種用途。首先,它們?cè)试S日志擴(kuò)展到超出適合單個(gè)服務(wù)器的大小。每個(gè)單獨(dú)的分區(qū)必須適合托管它的服務(wù)器,但主題可能有許多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。其次,它們充當(dāng)了并行性的單位-更多的是它。

1.3 Distribution分配

一個(gè)Topic的多個(gè)partitions,被分布在kafka集群中的多個(gè)server上;每個(gè)server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個(gè)數(shù)(replicas),每個(gè)partition將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性.

基于replicated方案,那么就意味著需要對(duì)多個(gè)備份進(jìn)行調(diào)度;每個(gè)partition都有一個(gè)server為"leader";leader負(fù)責(zé)所有的讀寫操作,如果leader失效,那么將會(huì)有其他follower來(lái)接管(成為新的leader);follower只是單調(diào)的和leader跟進(jìn),同步消息即可..由此可見(jiàn)作為leader的server承載了全部的請(qǐng)求壓力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)"leader",kafka會(huì)將"leader"均衡的分散在每個(gè)實(shí)例上,來(lái)確保整體的性能穩(wěn)定。

1.4 Producers生產(chǎn)者 和Consumers消費(fèi)者

1.4.1 Producers生產(chǎn)者

Producers將數(shù)據(jù)發(fā)布到指定的topics主題。同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于"round-robin"方式或者通過(guò)其他的一些算法等。

1.4.2Consumers

本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過(guò)來(lái)說(shuō),每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi)

如果所有使用者實(shí)例具有相同的使用者組,則記錄將有效地在使用者實(shí)例上進(jìn)行負(fù)載平衡

如果所有消費(fèi)者實(shí)例具有不同的消費(fèi)者組,則每個(gè)記錄將廣播到所有消費(fèi)者進(jìn)程

7a86093e-b9e4-11ef-8732-92fbcf53809c.png

分析:兩個(gè)服務(wù)器Kafka群集,托管四個(gè)分區(qū)(P0-P3),包含兩個(gè)使用者組。消費(fèi)者組A有兩個(gè)消費(fèi)者實(shí)例,B組有四個(gè)消費(fèi)者實(shí)例。

在Kafka中實(shí)現(xiàn)消費(fèi)consumption的方式是通過(guò)在消費(fèi)者實(shí)例上劃分日志中的分區(qū),以便每個(gè)實(shí)例在任何時(shí)間點(diǎn)都是分配的“公平份額”的獨(dú)占消費(fèi)者。維護(hù)組中成員資格的過(guò)程由Kafka協(xié)議動(dòng)態(tài)處理。如果新實(shí)例加入該組,他們將從該組的其他成員接管一些分區(qū);如果實(shí)例死亡,其分區(qū)將分發(fā)給其余實(shí)例。

Kafka僅提供分區(qū)內(nèi)記錄的總訂單,而不是主題中不同分區(qū)之間的記錄。對(duì)于大多數(shù)應(yīng)用程序而言,按分區(qū)排序與按鍵分區(qū)數(shù)據(jù)的能力相結(jié)合就足夠了。但是,如果您需要對(duì)記錄進(jìn)行總訂單,則可以使用僅包含一個(gè)分區(qū)的主題來(lái)實(shí)現(xiàn),但這將意味著每個(gè)使用者組只有一個(gè)使用者進(jìn)程。

1.5 Consumers kafka確保

發(fā)送到partitions中的消息將會(huì)按照它接收的順序追加到日志中。也就是說(shuō),如果記錄M1由與記錄M2相同的生成者發(fā)送,并且首先發(fā)送M1,則M1將具有比M2更低的偏移并且在日志中更早出現(xiàn)。

消費(fèi)者實(shí)例按照它們存儲(chǔ)在日志中的順序查看記錄。對(duì)于消費(fèi)者而言,它們消費(fèi)消息的順序和日志中消息順序一致

如果Topic的"replicationfactor"為N,那么允許N-1個(gè)kafka實(shí)例失效,我們將容忍最多N-1個(gè)服務(wù)器故障,而不會(huì)丟失任何提交到日志的記錄。

1.6 kafka作為消息系統(tǒng)

Kafka的流概念與傳統(tǒng)的企業(yè)郵件系統(tǒng)相比如何?

(1)傳統(tǒng)消息系統(tǒng)

消息傳統(tǒng)上有兩種模型:queuing排隊(duì)and publish-subscribe發(fā)布-訂閱。在隊(duì)列中,消費(fèi)者池可以從服務(wù)器讀取并且每個(gè)記錄轉(zhuǎn)到其中一個(gè);在發(fā)布-訂閱中,記錄被廣播給所有消費(fèi)者。這兩種模型中的每一種都有優(yōu)點(diǎn)和缺點(diǎn)。排隊(duì)的優(yōu)勢(shì)在于它允許您在多個(gè)消費(fèi)者實(shí)例上劃分?jǐn)?shù)據(jù)處理,從而可以擴(kuò)展您的處理。不幸的是,一旦一個(gè)進(jìn)程讀取它已經(jīng)消失的數(shù)據(jù),隊(duì)列就不是多用戶。發(fā)布-訂閱允許您將數(shù)據(jù)廣播到多個(gè)進(jìn)程,但由于每條消息都發(fā)送給每個(gè)訂閱者,因此無(wú)法進(jìn)行擴(kuò)展處理。

卡夫卡的消費(fèi)者群體概念概括了這兩個(gè)概念。與隊(duì)列一樣,使用者組允許您將處理劃分為一組進(jìn)程(使用者組的成員)。與發(fā)布-訂閱一樣,Kafka允許您向多個(gè)消費(fèi)者組廣播消息。

(2)kafka的優(yōu)勢(shì)

Kafka模型的優(yōu)勢(shì)在于每個(gè)主題都具有這些屬性-它可以擴(kuò)展處理并且也是多用戶-不需要選擇其中一個(gè)。

與傳統(tǒng)的消息系統(tǒng)相比,Kafka具有更強(qiáng)的訂購(gòu)保證

傳統(tǒng)隊(duì)列在服務(wù)器上按順序保留記錄,如果多個(gè)消費(fèi)者從隊(duì)列中消耗,則服務(wù)器按照存儲(chǔ)順序分發(fā)記錄。但是,雖然服務(wù)器按順序分發(fā)記錄,但是記錄是異步傳遞給消費(fèi)者的,因此它們可能會(huì)在不同的消費(fèi)者處出現(xiàn)故障。這實(shí)際上意味著在存在并行消耗的情況下丟失記錄的順序。消息傳遞系統(tǒng)通常通過(guò)具有“獨(dú)占消費(fèi)者”概念來(lái)解決這個(gè)問(wèn)題,該概念只允許一個(gè)進(jìn)程從隊(duì)列中消耗,但當(dāng)然這意味著處理中沒(méi)有并行性。

kafka做得更好。通過(guò)在主題中具有并行性概念-分區(qū)-,Kafka能夠在消費(fèi)者流程池中提供訂購(gòu)保證和負(fù)載平衡。這是通過(guò)將主題中的分區(qū)分配給使用者組中的使用者來(lái)實(shí)現(xiàn)的,以便每個(gè)分區(qū)僅由該組中的一個(gè)使用者使用。通過(guò)這樣做,我們確保使用者是該分區(qū)的唯一讀者并按順序使用數(shù)據(jù)。由于有許多分區(qū),這仍然可以平衡許多消費(fèi)者實(shí)例的負(fù)載。但請(qǐng)注意,消費(fèi)者組中的消費(fèi)者實(shí)例不能超過(guò)分區(qū)。

1.7 kafka作為存儲(chǔ)系統(tǒng)

任何允許發(fā)布與消費(fèi)消息分離的消息的消息隊(duì)列實(shí)際上充當(dāng)了正在進(jìn)行的消息的存儲(chǔ)系統(tǒng)。Kafka的不同之處在于它是一個(gè)非常好的存儲(chǔ)系統(tǒng)

寫入Kafka的數(shù)據(jù)將寫入磁盤并進(jìn)行復(fù)制以實(shí)現(xiàn)容錯(cuò)。Kafka允許生產(chǎn)者等待確認(rèn),以便在完全復(fù)制之前寫入不被認(rèn)為是完整的,并且即使寫入的服務(wù)器失敗也保證寫入仍然存在。

磁盤結(jié)構(gòu)Kafka很好地使用了規(guī)模-無(wú)論服務(wù)器上有50 KB還是50 TB的持久數(shù)據(jù),Kafka都會(huì)執(zhí)行相同的操作。

由于認(rèn)真對(duì)待存儲(chǔ)并允許客戶端控制其讀取位置,您可以將Kafka視為一種專用于高性能,低延遲提交日志存儲(chǔ),復(fù)制和傳播的專用分布式文件系統(tǒng)

有關(guān)Kafka的提交日志存儲(chǔ)和復(fù)制設(shè)計(jì)的詳細(xì)信息,請(qǐng)閱讀此頁(yè)面。

1.8 kafka用于流處理

僅僅讀取,寫入和存儲(chǔ)數(shù)據(jù)流是不夠的,目的是實(shí)現(xiàn)流的實(shí)時(shí)處理

在Kafka中,流處理器是指從輸入主題獲取連續(xù)數(shù)據(jù)流,對(duì)此輸入執(zhí)行某些處理以及生成連續(xù)數(shù)據(jù)流以輸出主題的任何內(nèi)容

例如,零售應(yīng)用程序可能會(huì)接收銷售和發(fā)貨的輸入流,并輸出重新排序流和根據(jù)此數(shù)據(jù)計(jì)算的價(jià)格調(diào)整。

可以使用生產(chǎn)者和消費(fèi)者API直接進(jìn)行簡(jiǎn)單處理。但是,對(duì)于更復(fù)雜的轉(zhuǎn)換,Kafka提供了完全集成的Streams API。這允許構(gòu)建執(zhí)行非平凡處理的應(yīng)用程序,這些應(yīng)用程序可以計(jì)算流的聚合或?qū)⒘鬟B接在一起。

此工具有助于解決此類應(yīng)用程序面臨的難題:處理無(wú)序數(shù)據(jù),在代碼更改時(shí)重新處理輸入,執(zhí)行有狀態(tài)計(jì)算等。

流API構(gòu)建在Kafka提供的核心原語(yǔ)上:它使用生產(chǎn)者和消費(fèi)者API進(jìn)行輸入,使用Kafka進(jìn)行有狀態(tài)存儲(chǔ),并在流處理器實(shí)例之間使用相同的組機(jī)制來(lái)實(shí)現(xiàn)容錯(cuò)。

2、kafka使用場(chǎng)景

2.1消息Messaging

Kafka可以替代更傳統(tǒng)的消息代理。消息代理的使用有多種原因(將處理與數(shù)據(jù)生成器分離,緩沖未處理的消息等)。與大多數(shù)消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和容錯(cuò)功能,這使其成為大規(guī)模消息處理應(yīng)用程序的理想解決方案。

根據(jù)經(jīng)驗(yàn),消息傳遞的使用通常相對(duì)較低,但可能需要較低的端到端延遲,并且通常取決于Kafka提供的強(qiáng)大的耐用性保證。

在這個(gè)領(lǐng)域,Kafka可與傳統(tǒng)的消息傳遞系統(tǒng)(如ActiveMQ或RabbitMQ)相媲美。

2.2網(wǎng)站活動(dòng)跟蹤

Kafka的原始用例是能夠?qū)⒂脩艋顒?dòng)跟蹤管道重建為一組實(shí)時(shí)發(fā)布-訂閱源。這意味著站點(diǎn)活動(dòng)(頁(yè)面查看,搜索或用戶可能采取的其他操作)將發(fā)布到中心主題,每個(gè)活動(dòng)類型包含一個(gè)主題。這些源可用于訂購(gòu)一系列用例,包括實(shí)時(shí)處理,實(shí)時(shí)監(jiān)控以及加載到Hadoop或離線數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng)以進(jìn)行脫機(jī)處理和報(bào)告

活動(dòng)跟蹤通常非常高,因?yàn)闉槊總€(gè)用戶頁(yè)面視圖生成了許多活動(dòng)消息。

2.3度量Metrics

Kafka通常用于運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。這涉及從分布式應(yīng)用程序聚合統(tǒng)計(jì)信息以生成操作數(shù)據(jù)的集中式提要。

2.4日志聚合

許多人使用Kafka作為日志聚合解決方案的替代品。日志聚合通常從服務(wù)器收集物理日志文件,并將它們放在中央位置(可能是文件服務(wù)器或HDFS)進(jìn)行處理。Kafka抽象出文件的細(xì)節(jié),并將日志或事件數(shù)據(jù)作為消息流更清晰地抽象出來(lái)。這允許更低延遲的處理并更容易支持多個(gè)數(shù)據(jù)源和分布式數(shù)據(jù)消耗。與Scribe或Flume等以日志為中心的系統(tǒng)相比,Kafka提供了同樣出色的性能,由于復(fù)制而具有更強(qiáng)的耐用性保證,以及更低的端到端延遲。

2.5流處理

許多Kafka用戶在處理由多個(gè)階段組成的管道時(shí)處理數(shù)據(jù),其中原始輸入數(shù)據(jù)從Kafka主題中消費(fèi),然后聚合,豐富或以其他方式轉(zhuǎn)換為新主題以供進(jìn)一步消費(fèi)或后續(xù)處理

例如,用于推薦新聞文章的處理管道可以從RSS訂閱源抓取文章內(nèi)容并將其發(fā)布到“文章”主題;進(jìn)一步處理可能會(huì)對(duì)此內(nèi)容進(jìn)行規(guī)范化或重復(fù)數(shù)據(jù)刪除,并將已清理的文章內(nèi)容發(fā)布到新主題;最終處理階段可能會(huì)嘗試向用戶推薦此內(nèi)容。此類處理管道基于各個(gè)主題創(chuàng)建實(shí)時(shí)數(shù)據(jù)流的圖形。從0.10.0.0開(kāi)始,這是一個(gè)輕量級(jí)但功能強(qiáng)大的流處理庫(kù),名為Kafka Streams在Apache Kafka中可用于執(zhí)行如上所述的此類數(shù)據(jù)處理。除了Kafka Streams之外,其他開(kāi)源流處理工具包括Apache Storm和Apache Samza。

2.6 Event Sourcing

Event Sourcing是一種應(yīng)用程序設(shè)計(jì)風(fēng)格,其中狀態(tài)更改記錄為按時(shí)間排序的記錄序列。Kafka對(duì)非常大的存儲(chǔ)日志數(shù)據(jù)的支持使其成為以這種風(fēng)格構(gòu)建的應(yīng)用程序的出色后端。

2.7提交日志

Kafka可以作為分布式系統(tǒng)的一種外部提交日志。該日志有助于在節(jié)點(diǎn)之間復(fù)制數(shù)據(jù),并充當(dāng)故障節(jié)點(diǎn)恢復(fù)其數(shù)據(jù)的重新同步機(jī)制。Kafka中的日志壓縮功能有助于支持此用法。在這種用法中,Kafka類似于Apache BookKeeper項(xiàng)目。

3、kafka安裝

3.1下載安裝

到官網(wǎng)http://kafka.apache.org/downloads.html下載想要的版本;我這里下載的最新穩(wěn)定版2.1.0

注:由于Kafka控制臺(tái)腳本對(duì)于基于Unix和Windows的平臺(tái)是不同的,因此在Windows平臺(tái)上使用binwindows而不是bin/將腳本擴(kuò)展名更改為.bat。

[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[root@along ~]# cd /data/kafka_2.11-2.1.0/

3.2配置啟動(dòng)zookeeper

kafka正常運(yùn)行,必須配置zookeeper,否則無(wú)論是kafka集群還是客戶端的生存者和消費(fèi)者都無(wú)法正常的工作的;所以需要配置啟動(dòng)zookeeper服務(wù)。

(1)zookeeper需要java環(huán)境

1 [root@along ~]# yum -y install java-1.8.0

(2)這里kafka下載包已經(jīng)包括zookeeper服務(wù),所以只需修改配置文件,啟動(dòng)即可。

如果需要下載指定zookeeper版本;可以單獨(dú)去zookeeper官網(wǎng)http://mirrors.shu.edu.cn/apache/zookeeper/下載指定版本。

[root@along ~]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties
dataDir=/tmp/zookeeper   #數(shù)據(jù)存儲(chǔ)目錄
clientPort=2181   #zookeeper端口
maxClientCnxns=0

注:可自行添加修改zookeeper配置

3.3配置kafka

(1)修改配置文件

[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

注:可根據(jù)自己需求修改配置文件

broker.id:唯一標(biāo)識(shí)ID

listeners=PLAINTEXT://localhost:9092:kafka服務(wù)監(jiān)聽(tīng)地址和端口

log.dirs:日志存儲(chǔ)目錄

zookeeper.connect:指定zookeeper服務(wù)

(2)配置環(huán)境變量

[root@along ~]# vim /etc/profile.d/kafka.sh
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
[root@along ~]# source /etc/profile.d/kafka.sh

(3)配置服務(wù)啟動(dòng)腳本

[root@along ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
 
source /etc/rc.d/init.d/functions
 
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
 
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
 
# See how we were called.
case "$1" in
 
  start)
    echo -n "Starting Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
    echo " done."
    exit 0
    ;;
 
  stop)
    echo -n "Stopping Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9"
    echo " done."
    exit 0
    ;;
  hardstop)
    echo -n "Stopping (hard) Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9"
    echo " done."
    exit 0
    ;;
 
  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if [ "$c_pid" = "" ] ; then
      echo "Stopped"
      exit 3
    else
      echo "Running $c_pid"
      exit 0
    fi
    ;;
 
  restart)
    stop
    start
    ;;
 
  *)
    echo "Usage: kafka {start|stop|hardstop|status|restart}"
    exit 1
    ;;
 
esac

3.4啟動(dòng)kafka服務(wù)

(1)后臺(tái)啟動(dòng)zookeeper服務(wù)

[root@along~]#nohupzookeeper-server-start.sh/data/kafka_2.11-2.1.0/config/zookeeper.properties&

(2)啟動(dòng)kafka服務(wù)

[root@along ~]# service kafka start
Starting kafka (via systemctl):                            [  OK  ]
[root@along ~]# service kafka status
Running 86018
[root@along ~]# ss -nutl
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                              
tcp   LISTEN     0      50                    :::9092                              :::*                 
tcpLISTEN050:::2181:::*

4、kafka使用簡(jiǎn)單入門

4.1創(chuàng)建主題topics

創(chuàng)建一個(gè)名為“along”的主題,它只包含一個(gè)分區(qū),只有一個(gè)副本:

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".

如果我們運(yùn)行l(wèi)ist topic命令,我們現(xiàn)在可以看到該主題:

[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181
along

4.2發(fā)送一些消息

Kafka附帶一個(gè)命令行客戶端,它將從文件或標(biāo)準(zhǔn)輸入中獲取輸入,并將其作為消息發(fā)送到Kafka集群。默認(rèn)情況下,每行將作為單獨(dú)的消息發(fā)送。

運(yùn)行生產(chǎn)者,然后在控制臺(tái)中鍵入一些消息以發(fā)送到服務(wù)器。

1
2
3
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message

4.3啟動(dòng)消費(fèi)者

Kafka還有一個(gè)命令行使用者,它會(huì)將消息轉(zhuǎn)儲(chǔ)到標(biāo)準(zhǔn)輸出。

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message

所有命令行工具都有其他選項(xiàng);運(yùn)行不帶參數(shù)的命令將顯示更詳細(xì)地記錄它們的使用信息。

5、設(shè)置多代理kafka群集

到目前為止,我們一直在與一個(gè)broker運(yùn)行,但這并不好玩。對(duì)于Kafka,單個(gè)代理只是一個(gè)大小為1的集群,因此除了啟動(dòng)一些代理實(shí)例之外沒(méi)有太多變化。但是為了感受它,讓我們將我們的集群擴(kuò)展到三個(gè)節(jié)點(diǎn)(仍然在我們的本地機(jī)器上)。

5.1準(zhǔn)備配置文件

[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.11-2.1.0]# vim config/server-1.properties
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

注:該broker.id屬性是群集中每個(gè)節(jié)點(diǎn)的唯一且永久的名稱。我們必須覆蓋端口和日志目錄,因?yàn)槲覀冊(cè)谕慌_(tái)機(jī)器上運(yùn)行這些,并且我們希望讓所有代理嘗試在同一端口上注冊(cè)或覆蓋彼此的數(shù)據(jù)。

5.2開(kāi)啟集群另2個(gè)kafka服務(wù)

[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &
[root@along ~]# ss -nutl
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                          
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                 
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*                                
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*

5.3在集群中進(jìn)行操作

(1)現(xiàn)在創(chuàng)建一個(gè)復(fù)制因子為3的新主題my-replicated-topic

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

(2)在一個(gè)集群中,運(yùn)行“describe topics”命令查看哪個(gè)broker正在做什么

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1

注釋:第一行給出了所有分區(qū)的摘要,每個(gè)附加行提供有關(guān)一個(gè)分區(qū)的信息。由于我們只有一個(gè)分區(qū)用于此主題,因此只有一行。

“l(fā)eader”是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。每個(gè)節(jié)點(diǎn)將成為隨機(jī)選擇的分區(qū)部分的領(lǐng)導(dǎo)者。

“replicas”是復(fù)制此分區(qū)日志的節(jié)點(diǎn)列表,無(wú)論它們是否為領(lǐng)導(dǎo)者,或者即使它們當(dāng)前處于活動(dòng)狀態(tài)。

“isr”是“同步”復(fù)制品的集合。這是副本列表的子集,該列表當(dāng)前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲。

請(qǐng)注意,Leader: 2,在我的示例中,節(jié)點(diǎn)2是該主題的唯一分區(qū)的Leader。

(3)可以在我們創(chuàng)建的原始主題上運(yùn)行相同的命令,以查看它的位置

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along
Topic:along PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: along    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

(4)向我們的新主題發(fā)布一些消息:

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
>^C

(5)現(xiàn)在讓我們使用這些消息:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

5.4測(cè)試集群的容錯(cuò)性

(1)現(xiàn)在讓我們測(cè)試一下容錯(cuò)性。Broker 2充當(dāng)leader所以讓我們殺了它:

[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737
[root@along ~]# kill -9 106737
[root@along ~]# ss -nutl
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*                       
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*

(2)leader已切換到其中一個(gè)從屬節(jié)點(diǎn),節(jié)點(diǎn)2不再位于同步副本集中:

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 0,1

(3)即使最初接受寫入的leader已經(jīng)失敗,這些消息仍可供消費(fèi):

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

6、使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)

從控制臺(tái)寫入數(shù)據(jù)并將其寫回控制臺(tái)是一個(gè)方便的起點(diǎn),但有時(shí)候可能希望使用其他來(lái)源的數(shù)據(jù)或?qū)?shù)據(jù)從Kafka導(dǎo)出到其他系統(tǒng)。對(duì)于許多系統(tǒng),您可以使用Kafka Connect導(dǎo)入或?qū)С鰯?shù)據(jù),而不是編寫自定義集成代碼。

Kafka Connect是Kafka附帶的工具,用于向Kafka導(dǎo)入和導(dǎo)出數(shù)據(jù)。它是一個(gè)可擴(kuò)展的工具,運(yùn)行連接器,實(shí)現(xiàn)與外部系統(tǒng)交互的自定義邏輯。在本快速入門中,我們將了解如何使用簡(jiǎn)單的連接器運(yùn)行Kafka Connect,這些連接器將數(shù)據(jù)從文件導(dǎo)入Kafka主題并將數(shù)據(jù)從Kafka主題導(dǎo)出到文件。

(1)首先創(chuàng)建一些種子數(shù)據(jù)進(jìn)行測(cè)試:

1 [root@along ~]# echo -e "foo bar" > test.txt

或者在Windows上:

> echo foo> test.txt
> echo bar>> test.txt

(2)接下來(lái),啟動(dòng)兩個(gè)以獨(dú)立模式運(yùn)行的連接器,這意味著它們?cè)趩蝹€(gè)本地專用進(jìn)程中運(yùn)行。提供三個(gè)配置文件作為參數(shù)。

第一個(gè)始終是Kafka Connect流程的配置,包含常見(jiàn)配置,例如要連接的Kafka代理和數(shù)據(jù)的序列化格式。

其余配置文件均指定要?jiǎng)?chuàng)建的連接器。這些文件包括唯一的連接器名稱,要實(shí)例化的連接器類以及連接器所需的任何其他配置。

[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2023-01-161631,884]INFOKafkaConnectstandaloneworkerinitializing...(org.apache.kafka.connect.cli.ConnectStandalone:67)
[2023-01-161631,903]INFOWorkerInfovalues:
... ...

注:Kafka附帶的這些示例配置文件使用您之前啟動(dòng)的默認(rèn)本地群集配置并創(chuàng)建兩個(gè)連接器:第一個(gè)是源連接器,它從輸入文件讀取行并生成每個(gè)Kafka主題,第二個(gè)是宿連接器從Kafka主題讀取消息并將每個(gè)消息生成為輸出文件中的一行。

(3)驗(yàn)證是否導(dǎo)入成功(另起終端)

在啟動(dòng)過(guò)程中,您將看到許多日志消息,包括一些指示正在實(shí)例化連接器的日志消息。

①一旦Kafka Connect進(jìn)程啟動(dòng),源連接器應(yīng)該開(kāi)始從test.txt主題讀取行并將其生成到主題connect-test,并且接收器連接器應(yīng)該開(kāi)始從主題讀取消息connect-test并將它們寫入文件test.sink.txt。我們可以通過(guò)檢查輸出文件的內(nèi)容來(lái)驗(yàn)證數(shù)據(jù)是否已通過(guò)整個(gè)管道傳遞:

[root@along ~]# cat test.sink.txt


foo


bar

②請(qǐng)注意,數(shù)據(jù)存儲(chǔ)在Kafka主題中connect-test,因此我們還可以運(yùn)行控制臺(tái)使用者來(lái)查看主題中的數(shù)據(jù)(或使用自定義使用者代碼來(lái)處理它):

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

(4)繼續(xù)追加數(shù)據(jù),驗(yàn)證

[root@along ~]# echo Another line>> test.txt     
[root@along ~]# cat test.sink.txt
foo
bar
Another line
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}


{"schema":{"type":"string","optional":false},"payload":"Another line"}

鏈接:https://www.cnblogs.com/along21/p/10278100.html

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 流媒體
    +關(guān)注

    關(guān)注

    1

    文章

    198

    瀏覽量

    16880
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    53

    瀏覽量

    5356

原文標(biāo)題:超詳細(xì)“零”基礎(chǔ)kafka入門篇

文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    死角玩轉(zhuǎn)STM32——系統(tǒng)

    本文共4冊(cè),由于資料內(nèi)存過(guò)大,分開(kāi)上傳,有需要的朋友可以去主頁(yè)搜索下載哦~ 《死角玩轉(zhuǎn) STM32》系列教程由初級(jí)、中級(jí)、高級(jí)、系統(tǒng)
    發(fā)表于 05-21 14:08

    死角玩轉(zhuǎn)STM32——高級(jí)

    本文共4冊(cè),由于資料內(nèi)存過(guò)大,分開(kāi)上傳,有需要的朋友可以去主頁(yè)搜索下載哦~ 《死角玩轉(zhuǎn) STM32》系列教程由初級(jí)、中級(jí)、高級(jí)、系統(tǒng)
    發(fā)表于 05-21 14:02

    死角玩轉(zhuǎn)STM32——中級(jí)

    本文共4冊(cè),由于資料內(nèi)存過(guò)大,分開(kāi)上傳,有需要的朋友可以去主頁(yè)搜索下載哦~ 《死角玩轉(zhuǎn) STM32》系列教程由初級(jí)、中級(jí)、高級(jí)、系統(tǒng)
    發(fā)表于 05-21 13:56

    死角玩轉(zhuǎn)STM32——初級(jí)

    本文共4冊(cè),由于資料內(nèi)存過(guò)大,分開(kāi)上傳,有需要的朋友可以去主頁(yè)搜索下載哦~ 《死角玩轉(zhuǎn) STM32》系列教程由初級(jí)、中級(jí)、高級(jí)、系統(tǒng)
    發(fā)表于 05-21 13:48

    電子工程師自學(xué)速成 —— 提高

    本文共3冊(cè),由于資料內(nèi)存過(guò)大,分開(kāi)上傳,有需要的朋友可以去主頁(yè)搜索下載哦~ 電子工程師自學(xué)速成分為:入門篇、提高和設(shè)計(jì),本文為提高;內(nèi)容包括模擬電路和數(shù)字電路兩大部分,其中模擬
    發(fā)表于 05-15 15:56

    電子工程師自學(xué)速成——入門篇

    本文共3冊(cè),由于資料內(nèi)存過(guò)大,分開(kāi)上傳,有需要的朋友可以去主頁(yè)搜索下載哦~ 電子工程師自學(xué)速成分為:入門篇、提高和設(shè)計(jì),本文為入門篇,內(nèi)容包括電子技術(shù)
    發(fā)表于 05-15 15:50

    【「基礎(chǔ)開(kāi)發(fā)AI Agent」閱讀體驗(yàn)】+ 入門篇學(xué)習(xí)

    的是基礎(chǔ),主要從為什么要學(xué)習(xí)AI Agent和開(kāi)發(fā)AI Agent的知識(shí)儲(chǔ)備入手進(jìn)行介紹。作為入門AI Agent的小白還是很有必要學(xué)習(xí)的。這里將一些重要觀點(diǎn)作個(gè)歸納 1.AI Agent=大模型+記憶
    發(fā)表于 05-02 09:26

    【「基礎(chǔ)開(kāi)發(fā)AI Agent」閱讀體驗(yàn)】+初品Agent

    期待中的《基礎(chǔ)開(kāi)發(fā)AI Agent——手把手教你用扣子做智能體》終于寄到了,該書由葉濤、 管鍇、張心雨完成,并由電子工業(yè)出版社出版發(fā)行。 全書分為三個(gè)部分,即入門篇、工具及實(shí)踐
    發(fā)表于 04-22 11:51

    【「基礎(chǔ)開(kāi)發(fā)AI Agent」閱讀體驗(yàn)】總體預(yù)覽及入門篇

    基礎(chǔ)知識(shí)有所補(bǔ)充,另外書本后面的案例也會(huì)對(duì)Ai的應(yīng)用產(chǎn)生一些啟發(fā). 首先老規(guī)矩,先看一下目錄結(jié)構(gòu) 包含3大主題: 入門篇:介紹了Agent的概念、發(fā)展、與Prompt和Copilot的區(qū)別
    發(fā)表于 04-20 21:53

    新概念51單片機(jī)C語(yǔ)言教程入門、提高、開(kāi)發(fā)、拓展全攻略

    資料介紹 從實(shí)際應(yīng)用入手,以實(shí)驗(yàn)過(guò)程和實(shí)驗(yàn)現(xiàn)象為主導(dǎo),循序漸進(jìn)地講述51單片機(jī)C語(yǔ)言編程方法以及51單片機(jī)的硬件結(jié)構(gòu)和功能應(yīng)用。全書共分5,分別為入門篇、內(nèi)外部資源操作、提高、實(shí)
    發(fā)表于 04-15 13:57

    電路識(shí)圖從入門到精通高清電子資料

    由淺入深地介紹了電路圖的基礎(chǔ)知識(shí)、典型單元電路的識(shí)圖方法,通過(guò)“入門篇”和“精通篇”循序漸進(jìn)、由淺入深地介紹了電路圖的基礎(chǔ)知識(shí)、典型單元電路的識(shí)圖方法,以及典型小家電、電動(dòng)車、洗衣機(jī)、電冰箱、空調(diào)器
    發(fā)表于 04-10 16:22

    請(qǐng)求贈(zèng)閱《基礎(chǔ)開(kāi)發(fā)AI Agent——手把手教你用扣子做智能體》

    ! 我請(qǐng)求閱讀這本書的三大理由是:其一是,我只學(xué)過(guò)點(diǎn)匯編語(yǔ)言,不懂C語(yǔ)言,不會(huì)編程。而本書的入門篇介紹了Agent的概念、發(fā)展、與Prompt和Copilot的區(qū)別,Agent對(duì)個(gè)人和企業(yè)的價(jià)值,以及
    發(fā)表于 04-10 12:16

    名單公布!【書籍評(píng)測(cè)活動(dòng)NO.59】基礎(chǔ)開(kāi)發(fā)AI Agent——手把手教你用扣子做智能體

    ,是AI技術(shù)的下一個(gè)風(fēng)口。為了讓更多非技術(shù)出身的人能夠通俗地理解Agent,并門檻利用Agent開(kāi)發(fā)平臺(tái)設(shè)計(jì)自己的Agent,我們撰寫了本書。 本書分為入門篇、工具、實(shí)戰(zhàn)
    發(fā)表于 03-10 16:29

    基礎(chǔ)開(kāi)發(fā)小安派-Eyes-S1【入門篇】——工程文件架構(gòu)

    -Eyes-S1【入門篇】——初識(shí)小安派-Eyes-S12、基礎(chǔ)開(kāi)發(fā)小安派-Eyes-S1【入門篇】——安裝VMware與Ubuntu3、入門篇
    的頭像 發(fā)表于 11-06 16:10 ?476次閱讀
    <b class='flag-5'>零</b>基礎(chǔ)開(kāi)發(fā)小安派-Eyes-S1【<b class='flag-5'>入門篇</b>】——工程文件架構(gòu)

    Kafka高性能背后的技術(shù)原理

    Kafka 是一款性能非常優(yōu)秀的消息隊(duì)列,每秒處理的消息體量可以達(dá)到千萬(wàn)級(jí)別。
    的頭像 發(fā)表于 10-23 09:37 ?694次閱讀
    <b class='flag-5'>Kafka</b>高性能背后的技術(shù)原理