Python kafka操作实例

一、基本概念

  • Topic:一组消息数据的标记符;
  • Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
  • Consumer:消费者,获取数据,可消费指定的Topic;
  • Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取
  • Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅

二、本地安装与启动(基于Docker)

  1. 下载zookeeper镜像与kafka镜像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2. 本地启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper  

3. 本地启动kafka

docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=localhost \
--env KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka:latest 

注意:上述代码,将kafka启动在9092端口

4. 进入kafka bash

docker exec -it kafka bash
cd /opt/kafka/bin

5. 创建Topic,分区为2,Topic name为'kafka_demo'

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

6. 查看当前所有topic

kafka-topics.sh --zookeeper zookeeper:2181 --list

7. 安装kafka-python

pip install kafka-python

三、生产者(Producer)与消费者(Consumer)

个人封装

生产者和消费者的简易Demo,这里一起演示:

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import json
import traceback

from kafka import KafkaConsumer,KafkaProducer,TopicPartition


"""
kafka 生产者
"""
class KProducer(object):
    def __init__(self,bootstrap_servers):
        """
        :param bootstrap_servers: 地址
        """
        # json 格式化发送的内容
        self.producer = KafkaProducer(
            bootstrap_servers = bootstrap_servers,value_serializer = lambda m: json.dumps(m).encode("ascii")
            # compression_type = "gzip"    # 压缩消息发送
        )

    def sync_producer(self,topic,data):
        """
        同步发送 数据
        :param topic:  topic
        :param data_li:  发送数据
        :return:
        """
        future = self.producer.send(topic,data)
        record_Metadata = future.get(timeout=10)  # 同步确认消费
        partition = record_Metadata.partition     # 数据所在的分区
        offset = record_Metadata.offset           # 数据所在分区的位置
        print("save success,partition: {},offset: {}".format(partition,offset))

    def asyn_producer(self,data):
        """
        异步发送数据
        :param topic:  topic
        :param data_li:发送数据
        :return:
        """
        self.producer.send(topic,data)
        self.producer.flush()  # 批量提交

    def asyn_producer_callback(self,data):
        """
        异步发送数据 + 发送状态处理
        :param topic:  topic
        :param data_li:发送数据
        :return:
        """
        self.producer.send(topic,data).add_callback(self.send_success).add_errback(self.send_error)
        self.producer.flush()  # 批量提交

    def send_success(self,*args,**kwargs):
        """异步发送成功回调函数"""
        print('save success')
        return

    def send_error(self,**kwargs):
        """异步发送错误回调函数"""
        print('save error')
        return

    def close_producer(self):
        try:
            self.producer.close()
        except:
            pass


"""
kafka 消费商
"""
class PConsumers(object):
    def __init__(self,bootstrap_servers,group_id):
        """
        :param bootstrap_servers: 地址
        """
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id

    # 获取规定个数的数据(可修改做无限持续获取数据)
    def get_message(self,count=1):
        """
        :param topic:   topic
        :param count: 取的条数
        :return: msg
        """
        counter = 0
        msg = []

        try:
            consumer = KafkaConsumer(
                topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id,value_deserializer = lambda m: json.loads(m.decode("ascii")),# 确定返回结果json还是str
                auto_offset_reset = "earliest"
            )

            for message in consumer:
                print(
                    "%s:%d:%d: key=%s value=%s header=%s" % (
                        message.topic,message.partition,message.offset,message.key,message.value,message.headers
                    )
                )

                msg.append(message.value)

                counter += 1
                if count == counter:
                    break
                else:
                    continue
            consumer.close()
        except Exception as e:
            print("{0},{1}".format(e,traceback.print_exc()))
            return None
        return msg

    # 查看剩余量
    def get_count(self,topic):
        """
        :param topic: topic
        :return: count
        """
        try:
            consumer = KafkaConsumer(
                topic,group_id = self.group_id
            )
            partitions = [TopicPartition(topic,p) for p in consumer.partitions_for_topic(topic)]
 
            #print("start to cal offset:")
 
            # total
            toff = consumer.end_offsets(partitions)
            toff = [(key.partition,toff[key]) for key in toff.keys()]
            toff.sort()
            #print("total offset: {}".format(str(toff)))
    
            # current
            coff = [(x.partition,consumer.committed(x)) for x in partitions]
            coff.sort()
            #print("current offset: {}".format(str(coff)))
 
            # cal sum and left
            toff_sum = sum([x[1] for x in toff])
            cur_sum = sum([x[1] for x in coff if x[1] is not None])
            left_sum = toff_sum - cur_sum
            #print("kafka left: {}".format(left_sum))
            consumer.close()
        except Exception as e:
            print("{0},traceback.print_exc()))
            return None

        return left_sum
        



if __name__ == "__main__":

    send_data_li = {"test": 1}
    #kp = KProducer(topic="test", bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')
    kp = KProducer(bootstrap_servers="1.1.1.1:9092")

    # 同步发送
    #kp.sync_producer(send_data_li)

    # 异步发送
    # kp.asyn_producer(send_data_li)

    # 异步+回调
    kp.asyn_producer_callback(topic="test",data=send_data_li)

    #kp.close_producer()

    #cp = PConsumers(bootstrap_servers="1.1.1.1:9092",topic="detect-file")
    cp = PConsumers(bootstrap_servers="1.1.1.1:9092",group_id = "Boxer")
    #cp = PConsumers(bootstrap_servers="1.1.1.1:9092",topic="custom-event")

    #print(cp.get_count(topic="test"))
    print(cp.get_message(topic="test"))

 

相关文章

功能概要:(目前已实现功能)公共展示部分:1.网站首页展示...
大体上把Python中的数据类型分为如下几类: Number(数字) ...
开发之前第一步,就是构造整个的项目结构。这就好比作一幅画...
源码编译方式安装Apache首先下载Apache源码压缩包,地址为ht...
前面说完了此项目的创建及数据模型设计的过程。如果未看过,...
python中常用的写爬虫的库有urllib2、requests,对于大多数比...