[727]python操作kafka
文章目录
- kafka
- github:KafkaProducer
- github:Consumer
- my code
- python从kafka消费数据且写入kafka
- pykafka
- kafka指定时间范围消费一批topic数据
kafka
pypi:https://pypi.org/project/kafka-python/
kafka-python:https://github.com/dpkp/kafka-python
pip install kafka
pip install kafka-python
如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组
kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费
github:KafkaProducer
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])>>> # Get producer performance metrics
>>> metrics = producer.metrics()
补充
from kafka import KafkaProducer
from kafka.errors import KafkaErrorproducer = KafkaProducer(bootstrap_servers=['broker1:1234'])def on_send_success(record_metadata):print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)def on_send_error(excp):log.error('I am an errback', exc_info=excp)# handle exception# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)# configure multiple retries
producer = KafkaProducer(retries=5)
github:Consumer
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
... print (msg)>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)>>> # Get consumer metrics
>>> metrics = consumer.metrics()
补充
from kafka import KafkaConsumerconsumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092']) #参数为接收主题和kafka服务器地址
# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer: # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))#消费者读取目前最早可读的消息
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
- 消费者(手动设置偏移量)
# ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartitionconsumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])print(consumer.partitions_for_topic("test")) #获取test主题的分区信息
print(consumer.topics()) #获取主题列表
print(consumer.subscription()) #获取当前消费者订阅的主题
print(consumer.assignment()) #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
- 消费者(订阅多个主题)
# =======订阅多个消费者==========
from kafka import KafkaConsumer
from kafka.structs import TopicPartitionconsumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0')) #订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
- 消费者(手动拉取消息)
from kafka import KafkaConsumer
import timeconsumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:msg = consumer.poll(timeout_ms=5) #从kafka获取消息print(msg)time.sleep(2)
- 消费者(消息挂起与恢复)
# ==============消息恢复和挂起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import timeconsumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0)) # pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:print(num)print(consumer.paused()) #获取当前挂起的消费者msg = consumer.poll(timeout_ms=5)print(msg)time.sleep(2)num = num + 1if num == 10:print("resume...")consumer.resume(TopicPartition(topic='test', partition=0))print("resume......")
pause执行后,consumer不能读取,直到调用resume后恢复。
my code
# -*- coding:utf-8 -*-
import sys,time,json
from kafka import KafkaProducer,KafkaConsumer,TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError
# from kafka.structs import TopicPartition'''
pip install kafka==1.3.5
pip install kafka-python==1.3.5
'''kafka_host = "47.14.12.26"
kafka_port = 9092
kafka_topic = "test"class Kafka():def __init__(self,key='key',group_id='group_id'):self.key = keybootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host,kafka_port=kafka_port),]self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers,)'''fetch_min_bytes(int) - 服务器为获取请求而返回的最小数据量,否则请等待fetch_max_wait_ms(int) - 如果没有足够的数据立即满足fetch_min_bytes给出的要求,服务器在回应提取请求之前将阻塞的最大时间量(以毫秒为单位)fetch_max_bytes(int) - 服务器应为获取请求返回的最大数据量。这不是绝对最大值,如果获取的第一个非空分区中的第一条消息大于此值,则仍将返回消息以确保消费者可以取得进展。注意:使用者并行执行对多个代理的提取,因此内存使用将取决于包含该主题分区的代理的数量。支持的Kafka版本> = 0.10.1.0。默认值:52428800(50 MB)。enable_auto_commit(bool) - 如果为True,则消费者的偏移量将在后台定期提交。默认值:True。max_poll_records(int) - 单次调用中返回的最大记录数poll()。默认值:500max_poll_interval_ms(int) - poll()使用使用者组管理时的调用之间的最大延迟 。这为消费者在获取更多记录之前可以闲置的时间量设置了上限。如果 poll()在此超时到期之前未调用,则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员。默认300000'''self.consumer = KafkaConsumer(# kafka_topic,bootstrap_servers=bootstrap_servers,group_id = group_id,# auto_offset_reset='earliest',enable_auto_commit=False)self.topic_partition=TopicPartition(topic=kafka_topic, partition=0)self.topic_partition2 =TopicPartition(topic=kafka_topic, partition=1)##分配该消费者的TopicPartition,这里和KafkaConsumer()里不能同时配置kafka_topicself.consumer.assign([self.topic_partition,# self.topic_partition2])# 获取test主题的分区信息print(self.consumer.partitions_for_topic(kafka_topic))print(self.consumer.assignment())print(self.consumer.beginning_offsets(self.consumer.assignment()))committed_offset=self.consumer.committed(self.topic_partition)if committed_offset==None:##重置此消费者消费的起始位self.consumer.seek(partition=self.topic_partition, offset=0)end_offset = self.consumer.end_offsets([self.topic_partition])[self.topic_partition]print('已保存的偏移量:',committed_offset,'最新偏移量:',end_offset)# 生产模块def producer_data(self,):try:for _id in range(600,610):params = {"msg" : str(_id)}parmas_message = json.dumps(params,ensure_ascii=False)v = parmas_message.encode('utf-8')k = self.key.encode('utf-8')print("send msg:(k,v)",k,v)self.producer.send(kafka_topic, key=k, value= v, partition=0)self.producer.flush()# time.sleep(0.5)self.producer.close()except KafkaError as e:print (e)# # 消费模块def consumer_data(self):try:print('consumer_data start')for msg in self.consumer:print(msg)print('msg----->k,v,offset:', msg.key, msg.value,msg.offset)# 手动提交偏移量 offsets格式:{TopicPartition:OffsetAndMetadata(offset_num,None)}self.consumer.commit(offsets={self.topic_partition:(OffsetAndMetadata(msg.offset+1,None))})committed_offset = self.consumer.committed(self.topic_partition)print('已保存的偏移量:', committed_offset)time.sleep(5)except KeyboardInterrupt as e:print(e)if __name__ == '__main__':try:kafka=Kafka()kafka.producer_data()kafka.consumer_data()except Exception as e:import tracebackex_msg = '{exception}'.format(exception=traceback.format_exc())print(ex_msg)
python从kafka消费数据且写入kafka
# -*- coding:utf-8 -*-
import os,sys, time
import json,requests
import logging,logging.handlers
from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
from kafka.errors import KafkaError# from kafka.structs import TopicPartition'''
pip3 install kafka==1.3.5
pip3 install kafka-python==1.3.5
'''def get_logger(log_name='',date_str=None):local_path="./logs/"if not os.path.exists(local_path):# os.mkdir(local_path)# 只能创建一级目录os.makedirs(local_path) # 可以创建多级目录if date_str==None:date_str = time.strftime('%Y-%m-%d', time.localtime(time.time()))logfile = local_path + date_str + log_name + ".log"hander = logging.handlers.RotatingFileHandler(logfile,encoding='utf-8')formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s][%(levelname)s] %(message)s')hander.setFormatter(formatter)logger = logging.getLogger(logfile)## 每次被调用后,清空已经存在handler,不然日志会打印多遍logger.handlers.clear()logger.addHandler(hander)logger.setLevel(logging.INFO)return logger# 钉钉报警
def send_ding_msg(content=None, ding_token=None, atMobiles=None, isAtAll=False):# 根据电话@用户,isAtAll 是否@所有人atMobiles = ['1953', ] if atMobiles == None else atMobiles# 数据采集监控群token = '8cf7f38c63c5b4a' if ding_token == None else ding_tokenapi = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(token)headers = {'Content-Type': 'application/json;charset=utf-8'}msg = {'msgtype': 'text','text': {'content': content},'at': {'atMobiles': atMobiles, 'isAtAll': isAtAll}}# return Nonedata = requests.post(api, data=json.dumps(msg), headers=headers).json()return json.dumps(data)logger=get_logger(log_name='_kds',date_str=None)class Kafka_consumer():def __init__(self, kafka_host=None, kafka_port=None, kafka_topic=None, group_id=None,bootstrap_servers=None,topics=('test', 'test0')):print("consumer kafka_topic:%s group_id:%s bootstrap_servers: %s"%(kafka_topic,group_id,str(bootstrap_servers)))logger.info("consumer kafka_topic:%s group_id:%s bootstrap_servers: %s"%(kafka_topic,group_id,str(bootstrap_servers)))self.consumer = KafkaConsumer(# kafka_topic,group_id=group_id,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',# enable_auto_commit = False)# 订阅要消费的主题self.consumer.subscribe(topics=topics)print('all topics:',self.consumer.topics())# 获取test主题的分区信息partitions_for_topic=self.consumer.partitions_for_topic(kafka_topic)print('partitions_for_topic:%s'%str(partitions_for_topic))logger.info('partitions_for_topic:%s'%str(partitions_for_topic))assignment=self.consumer.assignment()print('assignment:',assignment)# print('beginning_offsets:',self.consumer.beginning_offsets(assignment))# print('end_offsets:',self.consumer.end_offsets(assignment))def consume_data(self,is_time=1):print('consumer_data start'+'='*50)logger.info('consumer_data start'+'='*50)for msg in self.consumer:topic=msg.topickey=msg.keyvalue=msg.valuepartition=msg.partitionoffset=msg.offsetif is_time==1:data_json=eval(value.decode('utf-8'))point_time=data_json.get('pointTime')if point_time != None and point_time>='2022-05-06':producer.send(topic,key,value)else:producer.send(topic, key, value)# print('msg----->:%s'%str(msg))print('msg----->topic:%s k:%s, v:%s, offset:%s' %(topic,key,value.decode('utf-8'),offset))logger.info('msg----->:%s'%str(msg))logger.info('msg----->topic:%s k:%s, v:%s, offset:%s' %(topic,key,value.decode('utf-8'),offset))# yield msgclass Kafka_producer():def __init__(self, kafka_host=None, kafka_port=None, kafka_topic=None,bootstrap_servers=None):print("producer kafka_topic:%s bootstrap_servers: %s" % (kafka_topic,str(bootstrap_servers)))logger.info("producer kafka_topic:%s bootstrap_servers: %s" % (kafka_topic,str(bootstrap_servers)))self.kafka_topic=kafka_topicself.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)def send(self,topic, k, v):# k = k.encode('utf-8')# v = v.encode('utf-8')resp = self.producer.send(topic, key=k, value=v)# print('resp:',resp.succeeded())self.producer.flush()if __name__ == '__main__':try:c_bootstrap_servers = [# 测试kafka地址(消费者)'cdp101:9092', 'cdp102:9092', 'cdp103:9092',]p_bootstrap_servers = [# 测试kafka地址(生产者)'cdp101:9092', 'cdp102:9092', 'cdp103:9092',]consumer = Kafka_consumer(topics=('COMPASS','COMPASS_CHITU'), group_id='data.shift',bootstrap_servers=c_bootstrap_servers)producer = Kafka_producer(bootstrap_servers=p_bootstrap_servers)message = consumer.consume_data()except Exception as e:import tracebackex_msg = '{exception}'.format(exception=traceback.format_exc())print(ex_msg)logger.info("error: %s,%s" % (ex_msg, e))# raise Exception(ex_msg)finally:send_content ="任务名:kafka_data_shift \n group_id:data.shift 程序退出,请及时处理"send_ding_msg(content=send_content)
参考:
https://www.cnblogs.com/reblue520/p/8270412.html
https://blog.csdn.net/luanpeng825485697/article/details/81036028
https://www.cnblogs.com/small-office/p/9399907.html
https://blog.csdn.net/xiaofei2017/article/details/80924800
https://www.jianshu.com/p/bcab1d36ff92
python指定时间戳进行消费kafka:https://blog.csdn.net/weixin_43597282/article/details/120995691
pykafka
pykafka:https://github.com/Parsely/pykafka
pip install pykafka
开始肯定去找python连接kafka的标准库,kafka-python和pykafka 前者使用的人多是比较成熟的库,后者是Samsa的升级版本,在python连接并使用kafka 使用samsa连接zookeeper然后使用kafka Cluster很能满足我的需求,在pykafka的例子中也看到了zk的支持,而kafka-python并没有zk的支持,所以选择了pykafka做为连接库
- 概念问题
kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但是我跟人沟通,他们使用的时候是生产者直接连接kafaka服务器列表,消费者才用zookeeper。这也解决了我看pykafka文档,只有消费者才连接zookeeper的困惑,所以问题解决,直接按照文档搞起。
- 生产者
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点
>>> client.topics # 查看所有topic
>>> topic = client.topics['my.test'] # 选择一个topic
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过
- 消费者
>>> balanced_consumer = topic.get_balanced_consumer(consumer_group='testgroup',auto_commit_enable=True, # 设置为Flase的时候不需要添加 consumer_groupzookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
)
参考:http://opslinux.com/2015/07/14/python%E8%BF%9E%E6%8E%A5kafka/
kafka指定时间范围消费一批topic数据
public class JavaConsumerTool {/*** 创建消费者* @return*/public static KafkaConsumer getConsumer(){Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer(props);return consumer;}/*** 根据时间戳获取偏移量* @param consumer* @param topic* @param partition 分区号* @param datetimeStr 消息时间* @return* @throws ParseException*/public static Long getOffsetByDateTime(KafkaConsumer consumer, String topic,int partition,String datetimeStr) throws ParseException {DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timestamp = df.parse(datetimeStr).getTime();Map map = new HashMap();TopicPartition topicPartition = new TopicPartition(topic, partition);map.put(topicPartition,timestamp);Map offset = null;try {offset = consumer.offsetsForTimes(map,Duration.ofSeconds(10));}catch (Exception e){e.printStackTrace();return null;}return offset.get(topicPartition).offset();}/*** 消费某时间范围内的一批数据* @param consumer* @param topic* @param partition 分区号* @param startTime 消费起始时间* @param endTime 消费结束时间*/public static void consumerOnTimeBatch(KafkaConsumer consumer, String topic, int partition, String startTime,String endTime){TopicPartition topicPartition = new TopicPartition(topic,partition);//指定主题分区consumer.assign(Arrays.asList(topicPartition));long startOffset = 0L;long endOffset = 0L;try {startOffset = getOffsetByDateTime(consumer,topic,partition,startTime);endOffset = getOffsetByDateTime(consumer,topic,partition,endTime);} catch (ParseException e) {e.printStackTrace();}consumer.seek(topicPartition,startOffset);long offset = 0L;while (offset<=endOffset) {ConsumerRecords records = consumer.poll(Duration.ofMillis(1));for (ConsumerRecord record : records){offset = record.offset();System.out.println("时间:"+new Date(record.timestamp())+",偏移量:"+record.offset()+",消息体:"+record.value());}}consumer.close();}/*执行入口*/public static void main(String[] args) throws Exception {KafkaConsumer consumer = getConsumer();String topic = "test";int partition = 0;String startTime = "1997-01-17 00:00:00";String endTime = "1997-01-18 00:00:00";//消费某时间范围的一批主题数据consumerOnTimeBatch(consumer,topic,partition,startTime,endTime);}}
参考:https://blog.csdn.net/qq_32068809/article/details/122562478
https://blog.csdn.net/tianshishangxin1/article/details/120139470
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
