消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用默认的方式进行数据分区。
有没有指定partition可以从源码中看出:
public ProducerRecord(String topic, Integer partition, K key, V value) {}
如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用默认的分区策略来进行数据分区
Demo:
public class CustomPartitioner { private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class); public static void main(String[] args) { //1.加载配置信息 Properties prop = loadProperties(); //2.创建生产者 KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop); String sendContent = "hello_kafka"; IntStream.range(0, 10).forEach(i ->{ try { ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i); //topic key value Future<RecordMetadata> future = producer.send(record); RecordMetadata recordMetadata = future.get(); LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition()); } catch (Exception e) { e.printstacktrace(); } }); } //配置文件的设置 public static Properties loadProperties() { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092"); prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("acks", "all"); //发送到所有的ISR队列中 return prop; } }