python操作kafka實踐的示例代碼
1、先看最簡單的場景,生產(chǎn)者生產(chǎn)消息,消費者接收消息,下面是生產(chǎn)者的簡單代碼。
#!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "database": "test_1", "host": "xxxx", "user": "root", "password": "root" }, "table": "msg", "msg": "Hello World" } msg = json.dumps(msg_dict) producer.send('test_rhj', msg, partition=0) producer.close()
下面是消費者的簡單代碼:
from kafka import KafkaConsumer consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
下面是結(jié)果:
2、如果想要完成負載均衡,就需要知道kafka的分區(qū)機制,同一個主題,可以為其分區(qū),在生產(chǎn)者不指定分區(qū)的情況,kafka會將多個消息分發(fā)到不同的分區(qū),消費者訂閱時候如果不指定服務(wù)組,會收到所有分區(qū)的消息,如果指定了服務(wù)組,則同一服務(wù)組的消費者會消費不同的分區(qū),如果2個分區(qū)兩個消費者的消費者組消費,則,每個消費者消費一個分區(qū),如果有三個消費者的服務(wù)組,則會出現(xiàn)一個消費者消費不到數(shù)據(jù);如果想要消費同一分區(qū),則需要用不同的服務(wù)組。以此為原理,我們對消費者做如下修改:
from kafka import KafkaConsumer consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
然后我們開兩個消費者進行消費,生產(chǎn)者分別往0分區(qū)和1分區(qū)發(fā)消息結(jié)果如下,可以看到,一個消費者只能消費0分區(qū),另一個只能消費1分區(qū):
3、kafka提供了偏移量的概念,允許消費者根據(jù)偏移量消費之前遺漏的內(nèi)容,這基于kafka名義上的全量存儲,可以保留大量的歷史數(shù)據(jù),歷史保存時間是可配置的,一般是7天,如果偏移量定位到了已刪除的位置那也會有問題,但是這種情況可能很?。幻總€保存的數(shù)據(jù)文件都是以偏移量命名的,當前要查的偏移量減去文件名就是數(shù)據(jù)在該文件的相對位置。要指定偏移量消費數(shù)據(jù),需要指定該消費者要消費的分區(qū),否則代碼會找不到分區(qū)而無法消費,代碼如下:
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531']) consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)]) print consumer.partitions_for_topic("test_rhj") # 獲取test主題的分區(qū)信息 print consumer.assignment() print consumer.beginning_offsets(consumer.assignment()) consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
因為指定的便宜量為0,所以從一開始插入的數(shù)據(jù)都可以查到,而且因為指定了分區(qū),指定的分區(qū)結(jié)果都可以消費,結(jié)果如下:
4、有時候,我們并不需要實時獲取數(shù)據(jù),因為這樣可能會造成性能瓶頸,我們只需要定時去獲取隊列里的數(shù)據(jù)然后批量處理就可以,這種情況,我們可以選擇主動拉取數(shù)據(jù)
from kafka import KafkaConsumer import time consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531']) consumer.subscribe(topics=('test_rhj',)) index = 0 while True: msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息 print msg time.sleep(2) index += 1 print '--------poll index is %s----------' % index
結(jié)果如下,可以看到,每次拉取到的都是前面生產(chǎn)的數(shù)據(jù),可能是多條的列表,也可能沒有數(shù)據(jù),如果沒有數(shù)據(jù),則拉取到的為空:
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python四個坐標點對圖片區(qū)域最小外接矩形進行裁剪
在圖像裁剪操作中,opencv和pillow兩個庫都具有相應(yīng)的函數(shù),如果想要對目標的最小外接矩形進行裁剪該如何操作呢?本文就來詳細的介紹一下2021-06-06python使用pika庫調(diào)用rabbitmq參數(shù)使用詳情
這篇文章主要介紹了python使用pika庫調(diào)用rabbitmq參數(shù)使用詳情,文章通過展開文章主題分享了三種方式,具有一定的參考價值,需要的朋友可以參考一下2022-08-08python dataframe向下向上填充,fillna和ffill的方法
今天小編就為大家分享一篇python dataframe向下向上填充,fillna和ffill的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-11-11Python?pandas找出、刪除重復(fù)的數(shù)據(jù)實例
在面試中很可能遇到給定一個含有重復(fù)元素的列表,刪除其中重復(fù)的元素,下面這篇文章主要給大家介紹了關(guān)于Python?pandas找出、刪除重復(fù)數(shù)據(jù)的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-07-07