问题描述
我正在开发PoC,以证明使用Azure Service Bus的Azure Java功能之间的请求/响应。一切正常,但当我收到消息时,锁令牌为null,因此无法完成消息。从下面的代码中,您可以看到我的接收器支持会话,并且处于PEEK_LOCK模式。我想念什么?
供参考,我正在使用:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.0-beta.5</version>
</dependency>
Earnie!
package com.ferguson.mn;
import java.time.Duration;
import java.util.UUID;
import javax.annotation.postconstruct;
import javax.annotation.PreDestroy;
import com.azure.messaging.servicebus.LockrenewalOperation;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
@Controller("/hello")
public class HelloController {
private ServiceBusSenderClient sender;
private ServiceBusReceiverClient receiver;
private final String sessionId = UUID.randomUUID().toString();
@postconstruct
public void init() throws Exception {
sender = new ServiceBusClientBuilder().connectionString(System.getenv("locationReqQueuePublisher")).sender()
.queueName("location-req").buildClient();
receiver = new ServiceBusClientBuilder().connectionString(System.getenv("locationRepQueueSubscriber"))
.sessionReceiver().sessionId(sessionId).queueName("location-rep").receiveMode(ReceiveMode.PEEK_LOCK)
.buildClient();
System.out.println("postconstruct complete");
}
@PreDestroy
public void cleanup() {
System.out.println("Closing sender and receiver");
if (sender != null) {
sender.close();
}
if (receiver != null) {
receiver.close();
}
}
@Post
public HttpResponse<String> post(final HttpRequest<LocationRequest> request)
throws InterruptedException,JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
final String json = mapper.writeValueAsstring(request.getBody(LocationRequest.class).get());
final ServiceBusMessage msg = new ServiceBusMessage(json);
msg.setSessionId(sessionId);
// msg.setReplyToSessionId(sessionId);
msg.setContentType(MediaType.APPLICATION_JSON);
msg.setMessageId(sessionId);
msg.getProperties().put("p1","v1");
sender.sendMessage(msg);
long execTime = System.currentTimeMillis();
System.out.println("Sent " + json);
String payload = null;
boolean done = false;
while (!done) {
try {
// System.out.println("Session state: " + new
// String(receiver.getSessionState(sessionId)));
final ServiceBusReceivedMessage resp = receiver.peekMessage(sessionId);
payload = new String(resp.getBody());
System.out.println("Payload: " + payload + " " + resp.getLockToken() + " " + resp.getSessionId() + " "
+ resp.getLockedUntil());
receiver.complete(resp.getLockToken(),sessionId);
done = true;
} catch (Exception e) {
System.out.println("Consumer error: " + e.getMessage() + " " + e.getClass().getName());
if (System.currentTimeMillis() - execTime > 5000) {
return HttpResponse.serverError("Yikes");
}
if (e instanceof NullPointerException) {
done = true;
} else {
System.out.println("Looping...");
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
}
}
}
}
System.out.println("Payload2: " + payload);
return HttpResponse.ok(payload).header("x-earnie","cool");
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)