在PEEK_LOCK模式下使用具有会话感知能力的接收者调用peekMessagesessionId时,Azure FunctionJava获得空的lockToken

问题描述

我正在开发PoC,以证明使用Azure Service Bus的Azure Java功能间的请求/响应。一切正常,但当我收到消息时,锁令牌为null,因此无法完成消息。从下面的代码中,您可以看到我的接收器支持会话,并且处于PEEK_LOCK模式。我想念什么?

供参考,我正在使用:

    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-servicebus</artifactId>
      <version>7.0.0-beta.5</version>
    </dependency>

Earnie!

package com.ferguson.mn;

import java.time.Duration;
import java.util.UUID;

import javax.annotation.postconstruct;
import javax.annotation.PreDestroy;

import com.azure.messaging.servicebus.LockrenewalOperation;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;

@Controller("/hello")
public class HelloController {
    private ServiceBusSenderClient sender;
    private ServiceBusReceiverClient receiver;
    private final String sessionId = UUID.randomUUID().toString();

    @postconstruct
    public void init() throws Exception {
        sender = new ServiceBusClientBuilder().connectionString(System.getenv("locationReqQueuePublisher")).sender()
                .queueName("location-req").buildClient();

        receiver = new ServiceBusClientBuilder().connectionString(System.getenv("locationRepQueueSubscriber"))
                .sessionReceiver().sessionId(sessionId).queueName("location-rep").receiveMode(ReceiveMode.PEEK_LOCK)
                .buildClient();

        System.out.println("postconstruct complete");
    }

    @PreDestroy
    public void cleanup() {
        System.out.println("Closing sender and receiver");
        if (sender != null) {
            sender.close();
        }
        if (receiver != null) {
            receiver.close();
        }
    }

    @Post
    public HttpResponse<String> post(final HttpRequest<LocationRequest> request)
            throws InterruptedException,JsonProcessingException {
        final ObjectMapper mapper = new ObjectMapper();
        final String json = mapper.writeValueAsstring(request.getBody(LocationRequest.class).get());
        final ServiceBusMessage msg = new ServiceBusMessage(json);

        msg.setSessionId(sessionId);
        // msg.setReplyToSessionId(sessionId);
        msg.setContentType(MediaType.APPLICATION_JSON);
        msg.setMessageId(sessionId);
        msg.getProperties().put("p1","v1");
        sender.sendMessage(msg);
        long execTime = System.currentTimeMillis();
        System.out.println("Sent " + json);
        String payload = null;
        boolean done = false;
        while (!done) {
            try {
                // System.out.println("Session state: " + new
                // String(receiver.getSessionState(sessionId)));
                final ServiceBusReceivedMessage resp = receiver.peekMessage(sessionId);
                payload = new String(resp.getBody());
                System.out.println("Payload: " + payload + " " + resp.getLockToken() + " " + resp.getSessionId() + " "
                        + resp.getLockedUntil());
                receiver.complete(resp.getLockToken(),sessionId);
                done = true;
            } catch (Exception e) {
                System.out.println("Consumer error: " + e.getMessage() + " " + e.getClass().getName());
                if (System.currentTimeMillis() - execTime > 5000) {
                    return HttpResponse.serverError("Yikes");
                }
                if (e instanceof NullPointerException) {
                    done = true;
                } else {
                    System.out.println("Looping...");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e2) {
                    }
                }
            }
        }
        System.out.println("Payload2: " + payload);
        return HttpResponse.ok(payload).header("x-earnie","cool");
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...