脉冲星消息侦听器的多线程

问题描述

我对java message listenersapache pulsar并不陌生。 假设我一直保持着这样的听众,

private MessageListener<byte[]> generateListener() {
        MessageListener<byte[]> listener = (consumer,respMsg) -> {
            String respM = new String(respMsg.getValue(),StandardCharsets.UTF_8);
            logger.info(respM);
            consumer.ackNowledgeAsync(respMsg);
        };
        return listener;
    }

还有一个Consumer实例,

Consumer<byte[]> c = consumerBuilder.messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync().get();

我想知道的是此侦听器将如何处理多个传入消息?是否像JMS侦听器一样,在单独的线程中处理每个消息?如果是这样,那么我该如何配置要使用的线程数-是通过使用ClientBuilder.listenerThreads()属性吗?

当维护多个使用者时,是否需要维护与每个使用者对应的多个侦听器对象,即类似的东西-

consumerBuilder.clone().messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync()吗?

解决方法

ClientBuilder#listenerThreads方法允许配置内部线程池的大小,该内部线程池将在从该客户端创建的所有ConsumersReaders之间共享和使用。

Pulsar客户将向您保证单个使用者的MessageListener将始终由同一线程调用,即所提供的MessageListener不必是线程安全的。因此,是的,每个MessageListener最好使用专用的Consumer对象。

请注意,这还可以确保订购。

因此,基本上,如果仅使用一个Consumer,则可以将listenerThreads保留为1(这是默认设置)。

这是一个完整的示例,可用于观察行为:

public class PulsarConsumerListenerExample {

    public static void main(String[] args) throws PulsarClientException {

        int numListenerThread = 2;

        PulsarClient client = PulsarClient
                .builder()
                .serviceUrl("pulsar://localhost:6650")
                .listenerThreads(numListenerThread)
                .build();

        final List<Consumer<?>> consumers = new ArrayList<>();
        for (int i = 0; i < numListenerThread; i++) {
            consumers.add(createConsumerWithLister(client,"my-topic","my-subscription","C" + i));
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            for (Consumer<?> consumer : consumers) {
                try {
                    consumer.close();
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
            }
        }));
    }

    private static Consumer<String> createConsumerWithLister(final PulsarClient client,final String topic,final String subscription,final String consumerName) throws PulsarClientException {
        return client.newConsumer(Schema.STRING)
            .topic(topic)
            .consumerName(consumerName)
            .subscriptionName(subscription)
            .subscriptionMode(SubscriptionMode.Durable)
            .subscriptionType(SubscriptionType.Failover)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .messageListener((MessageListener<String>) (consumer,msg) -> {
                System.out.printf(
                    "[%s/%s]Message received: key=%s,value=%s,topic=%s,id=%s%n",consumerName,Thread.currentThread().getName(),msg.getKey(),msg.getValue(),msg.getTopicName(),msg.getMessageId().toString());
                consumer.acknowledgeAsync(msg);
            })
            .subscribe();
    }
}