Azure Web Pub Sub Service - JAVA

概述

使用 Azure Web PubSub 开发具有实时消息传递的 Web 应用程序,这是一种支持本机和无服务器 WebSocket 的完全托管服务。 使用发布-订阅消息模式创建松散耦合、可扩展的应用程序 - 包括聊天、直播和 IoT 仪表板。 当 Web PubSub 管理数据和内容流向您的网页和移动应用程序时,开发者可以专注于功能。

概念

连接

连接,也称为客户端或客户端连接,表示连接到 Web PubSub 服务的单个 WebSocket 连接。成功连接后,Web PubSub 服务会为此连接分配一个唯一的连接 ID。

中心

hub是一组客户端连接的逻辑概念。通常您将一个hub用于一个目的,例如,聊天hub或通知hub。当客户端连接时,它连接到一个hub,并且在其生命周期内,它属于该hub。一旦客户端连接到hub,hub就存在。不同的应用程序可以使用不同的hub名称共享一个 Azure Web PubSub 服务。

组是到hub的连接子集。您可以随时将客户端连接添加到组,或从组中删除客户端连接。例如,当客户加入聊天室时,或者当客户离开聊天室时,该聊天室可以被认为是一个群组。一个客户端可以加入多个组,一个组可以包含多个客户端。群组就像一个群组“会话”,一旦有人加入群组就会创建群组会话,当群组中没有人时会话消失。

用户

与 Web PubSub 的连接可以属于一个用户。一个用户可能有多个连接,例如当一个用户跨多个设备或多个浏览器选项卡连接时。

留言

当客户端连接时,它可以通过 WebSocket 连接向上游应用程序发送消息,或从上游应用程序接收消息。

工作流程

客户端使用 WebSocket 传输连接到服务/客户端端点。服务将每个 WebSocket 帧转发到配置的上游(服务器)。 WebSocket 连接可以与任何自定义子协议连接以供服务器处理,也可以与服务支持的子协议 json.webpubsub.azure.v1 连接,这使客户端能够直接进行 pub/sub。详细信息在客户端协议中进行了描述。

服务在不同的客户端事件上使用 CloudEvents HTTP 协议调用服务器。 CloudEvents 是由云原生计算基金会 (CNCF) 托管的事件的结构和元数据描述的标准化且与协议无关的定义。详细信息在服务器协议中描述。

服务器可以使用 REST API 调用服务向客户端发送消息或管理连接的客户端。详细信息在服务器协议中描述

创建一个web pub sub service

连接 webPubSubServiceClient

导入maven

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-webpubsub</artifactId>
    <version>1.1.4</version>
</dependency>

从Azure获取connectionString并确定Hub

WebPubSubServiceClient webPubSubServiceClient = new WebPubSubServiceClientBuilder()
        .connectionString(" ")
        .hub(" ")
        .buildClient();

连接webSocket

创建token

GetClientAccessTokenOptions getClientAccessTokenOptions = new GetClientAccessTokenOptions();
getClientAccessTokenOptions.addRole("webpubsub.sendToGroup");
getClientAccessTokenOptions.addRole("webpubsub.joinLeaveGroup");
WebPubSubClientAccessToken token = webPubSubServiceClient.getClientAccessToken(getClientAccessTokenOptions);

创建WebSocket并确定使用的数据传输协议

String url = token.getUrl();
ws = HttpClient.newHttpClient().newWebSocketBuilder().subprotocols("json.webpubsub.azure.v1")
        .buildAsync(URI.create(url), new WebSocketClient()).join();

实现WebSocket message listener(基于使用的WebSocket依赖不同,实现方式也不同)

private static final class WebSocketClient implements WebSocket.Listener {
    private WebSocketClient() {
    }

    @Override
    public void onOpen(WebSocket webSocket) {
        log.info("subscriber open");
        WebSocket.Listener.super.onOpen(webSocket);
    }

    @Override
    public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
        log.info("Message received:{}", data);
        return WebSocket.Listener.super.onText(webSocket, data, last);
    }

    @Override
    public void onError(WebSocket webSocket, Throwable error) {
        System.out.println("Bad day! " + webSocket.toString());
        WebSocket.Listener.super.onError(webSocket, error);
    }
}

信息

AckId

使用 ackId 时,你可以在你的请求被处理时收到确认响应消息。 可以在即发即弃场景中选择省略 ackId。
Web PubSub 服务将为具有 ackId 的每个请求发送确认响应。

public class AckResponseMessage {
    private String type;
    private String ackId;
    private boolean success;
    private Error error;

    public static class Error{
        private String name;
        private String message;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }
    }
}

发送至组

public class SendGroupMessage {
    public final String type = "sendToGroup";
    public String data;
    public int ackId;
    public String group;
    public boolean noEcho;
    public DataType dataType;

    public SendGroupMessage(String data, int ackId, String group, boolean noEcho, DataType dataType) {
        this.data = data;
        this.ackId = ackId;
        this.group = group;
        this.noEcho = noEcho;
        this.dataType = dataType;
    }
}

加入组

public class JoinGroupMessage {
    public int ackId;
    public final String type = "joinGroup";
    public String group;

    public JoinGroupMessage(int ackId, String group){
        this.ackId = ackId;
        this.group = group;
    }
}

退出组

public class LeaveGroupMessage {
    public int ackId;
    public final String type = "leaveGroup";
    public String group;

    public LeaveGroupMessage(int ackId, String group){
        this.ackId = ackId;
        this.group = group;
    }
}

收到来自组的消息

public class ReceivedGroupMessage {
    private String type;
    private String from;
    private String fromUserId;
    private String group;
    private DataType dataType;
    private String data;
}

收到来自服务器的消息

public class ReceivedServerMessage {
    private String type;
    private String from;
    private DataType dataType;
    private String data;
}

系统响应

public class ConnectedSystemMessage {
    private String type;
    private String event;
    private String userId;
    private String connectionId;
}

public class DisConnectedSystemMessage {
    private String type;
    private String event;
    private String message;
}

发布与订阅

发布

public void sendToGroup(String data,String group) {
    ++ackId;
    GroupMessage groupMessage = new GroupMessage(data, ackId, group);
    String string = null;
    try {
        string = objectMapper.writeValueAsString(groupMessage);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    ws.sendText(string,true);
}

订阅

@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
    try {
        String message = String.valueOf(data);
        handleData(message);
    } catch (Exception e) {
        log.warn("e:{}", e.getMessage());
    }
    return WebSocket.Listener.super.onText(webSocket, data, last);
}

相关文章

学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习...
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面...
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生...
Can’t connect to local MySQL server through socket \'/v...
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 ...
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服...