kafka producer partitions分区器(七)

  消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用认的方式进行数据分区。

  有没有指定partition可以从源码中看出:

 public ProducerRecord(String topic, Integer partition, K key, V value) {}

如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用认的分区策略来进行数据分区

 Demo:

public class CustomPartitioner {
    
    private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class);
    
    public static void main(String[] args) {
        //1.加载配置信息
        Properties prop = loadProperties();
        
        //2.创建生产者
        KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop);
        
        String sendContent = "hello_kafka";
        IntStream.range(0, 10).forEach(i ->{
            try {
                ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);  //topic key value
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata recordMetadata = future.get();
                LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
            } catch (Exception e) {
                e.printstacktrace();
            }
            
        });
        
    }
     //配置文件的设置
    public static Properties loadProperties() {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
        prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("acks", "all");    //发送到所有的ISR队列中
        return prop;
    }
}

 

 

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...