Redis使用Java为每个使用者流化一条消息

问题描述

我正在尝试使用Redis流实现Java应用程序,其中每个consomer都消耗一条消息。就像流水线/队列一样,每个使用者都只接收一条消息,对其进行处理,然后在使用者结束后,接收下一条消息,该消息在流中到目前为止尚未处理。 有效的方法是,每条消息仅由一个使用者(使用xreadgroup)消耗。

我从this tutorial from redislabs开始

代码

    RedisClient redisClient = RedisClient.create("redis://pw@host:port");
    StatefulRedisConnection<String,String> connection = redisClient.connect();
    RedisCommands<String,String> syncCommands = connection.sync();

    try {
        syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY,"0-0"),ID_READ_GROUP);
    } catch (RedisBusyException redisBusyException) {
        System.out.println(String.format("\t Group '%s' already exists",ID_READ_GROUP));
    }

    System.out.println("Waiting for new messages ");

    while (true) {
        List<StreamMessage<String,String>> messages = syncCommands.xreadgroup(
                Consumer.from(ID_READ_GROUP,ID_WORKER),ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

        if (!messages.isEmpty()) {
            System.out.println(messages.size()); // 
            for (StreamMessage<String,String> message : messages) {
                System.out.println(message.getId());
                Thread.sleep(5000);
                syncCommands.xack(STREAM_KEY,ID_READ_GROUP,message.getId());
            }
        }

    }

我当前的问题是,一个使用者从队列中获取一条消息以上,并且在某些情况下,其他使用者正在等待,而一个使用者一次要处理10条消息。

谢谢!

解决方法

请注意,XREADGROUP可以获取DoRequest参数。

通过传递XReadArgs,了解JavaDoc如何在COUNT xreadgroup中进行操作。