MessageListenerContainer

问题描述

嗨,我正在尝试从 mongo oplog 实现 MessageListener,它应该从上次停止的流文档中检索文档。 目前我的代码设置如下。不知道如何从最后一个文档中检索 resumetoken 并设置它,以便如果侦听器应用程序关闭并重新联机,它应该在最后一次读取后读取。

    @Bean
    MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate,@Qualifier("candidateMessageListener") MessageListener documentMessageListener)
    {
    Executor executor = Executors.newSingleThreadExecutor();
    MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate,executor)
    {
        @Override
        public boolean isAutoStartup()
        {
        return true;
        }
    };
    ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
                    .collection("candidate") // The name of the collection to listen to,Do not specify the default listening database
                    .filter(newAggregation(match(where("operationType").in("insert","update","replace","delete")))) // Filter the types of operations that need to be monitored,You can specify filter conditions according to your needs
                    .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                    // When not set,When the document is updated,Only information about the changed fields will be sent,Set up UPDATE_LOOKUP All information of the document will be returned
                    .build();
    messageListenerContainer.register(request,Candidate.class);
    return messageListenerContainer;
    }

解决方法

您可以在 getResumeToken() 上使用 getTimestamp()ChangeStreamEvent 方法来获取此信息。有关详细信息,请参阅 Reference Documentation - Resuming Change Streams