Rust:从 Azure 事件中心接收 AMQP 消息

问题描述

我想使用 Rust 的事件中心。

我发现了一些很有前途的板条箱,但没有什么令人满意的:

我还没有发现任何使用这些板条箱的实际应用程序。

我已经创建了 the-hub-namespacethe-hubthe-consumer-groupthe-policy 并获得了收听权限并获得了 the-key

到目前为止,我使用 ntex-amqp 取得了最大的成功。我不得不hotfix the rustls connector,因为它因无效 DNS 而失败 - 当只需要 DNS 名称时,主机名包含端口号 (name:port)。

Cargo.toml:

[package]
name = "tmp"
version = "0.1.0"
authors = ["me"]
edition = "2018"

[dependencies]
env_logger = "0.8"
ntex-amqp = { version="0.4",git="https://github.com/BrightOpen/ntex-amqp",branch="master" }
ntex = { version="0.3",features=["rustls"],git="https://github.com/BrightOpen/ntex",branch="master" }
rustls = "0.19"
futures = "0.3"

src/main.rs:

use ntex::connect::rustls::RustlsConnector;
use ntex_amqp::client::{self,SaslAuth};
use ntex_amqp::codec::types::{Descriptor,Symbol,Variant};
use rustls::ClientConfig;
use std::sync::Arc;
use futures::StreamExt;

#[ntex::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG","ntex=trace,ntex_amqp=trace,basic=trace");
    env_logger::init();

    let mut tlsconfig = ClientConfig::new();
    let certs = tlsconfig
        .root_store
        .add_pem_file(
            // This is the Azure CA cert
            &mut "-----BEGIN CERTIFICATE-----
MIIF8zCCBNugAwIBAgIQCq+mxcpjxFFB6jvh98dTFzANBgkqhkiG9w0BAQwFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH
MjAeFw0yMDA3MjkxMjMwMDBaFw0yNDA2MjcyMzU5NTlaMFkxCzAJBgNVBAYTAlVT
MR4wHAYDVQQKExVNaWNyb3NvZnQgQ29ycG9yYXRpb24xKjAoBgNVBAMTIU1pY3Jv
c29mdCBBenVyZSBUTFMgSXNzdWluZyBDQSAwMTCCAiIwDQYJKoZIhvcNAQEBBQAD
ggIPADCCAgocggIBAMedcDrkXufP7pxVm1FHLDNA9IjwHaMoaY8arqqZ4Gff4xyr
RygnavXL7g12Mpax8Q6Dd9hfBzrfWxkF0Br2wIvlvkzW01naNVSkHp+OS3hL3W6n
l/jYvZnVeJXjtsKYcXIf/6WtspcF5awlQ9LZJcjwaH7KoZuK+THpXCMtzD8XNVdm
GW/JI0C/7U/E7evXn9Xdio8SYkGSM63aLO5BtLCv092+1d4GGBSQYolRq+7Pd1kR
EkWBPm0ywZ2Vb8GIS5DLrjelEkBnKCyy3B0yQud9dpVsiUeE7F5sY8Me96WVxQcb
OyYdEY/j/9UpDlOG+vA+YgOvBhkKEjiqygVpP8EZoMMijephzg43b5Qi9r5UrvYo
o19oR/8pf4HJNDPF0/FJwFVMW8PmCBLGstin3NE1+NeWTkGt0TzpHjgKyfaDP2tO
4bCk1G7pP2kDFT7SYfc8xbgCkFQ2UCEXsaH/f5YmpLn4YPiNFCeeIida7xnfTvc4
7IxyVccHHq1FzGygOqemrxEETKh8hvDR6eBdrBwmCHVgZrnAqnn93JtGyPLi6+cj
WGVGtMZHwzVvX1HvSFG771sskcEjJxiQNQDQRWHEh3NxvNb7kFlAXnVdRkkvhjpR
GchFhTAzqmwltdWhWDEyCMKC2x/mSZvZtlZGY+g37Y72qHzidwtyW7rBetZJAgMB
AAGjggGtMIIBqTAdBgNVHQ4EFgQUDyBd16FXlduSzyvQx8J3BM5ygHYwHwYDVR0j
BBgwFoAUTiJUIBiV5uNu5g/6+rkS7QYXjzkwDgYDVR0PAQH/BAQDAgGGMB0GA1Ud
JQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjASBgNVHRMBAf8ECDAGAQH/AgEAMHYG
CCsGAQUFBwEBBGowaDAkBggrBgEFBQcwAYYYaHR0cDovL29jc3AuZGlnaWNlcnQu
Y29tMEAGCCsGAQUFBzAChjRodHRwOi8vY2FjZXJ0cy5kaWdpY2VydC5jb20vRGln
aUNlcnRHbG9iYWxSb290RzIuY3J0MHsGA1UdHwR0MHIwN6A1odoGMWh0dHA6Ly9j
cmwzLmRpZ2ljZXJ0LmNvbS9EaWdpQ2VydEdsb2JhbFJvb3RHMi5jcmwwN6A1odoG
MWh0dHA6Ly9jcmw0LmRpZ2ljZXJ0LmNvbS9EaWdpQ2VydEdsb2JhbFJvb3RHMi5j
cmwwHQYDVR0gBBYwFDAIBgZngQwBAgEwCAYGZ4EMAQICMBAGCSsGAQQBgjcVAQQD
AgEAMA0GCSqGSIb3DQEBDAUAA4IBAQAlFvNh7QgXVLAZSsNR2XRmIn9iS8OHFCBA
WxKJoi8YYQafpMTkMqeuzoL3HWb1pYEipsDkhiMnrpfeYZEA7Lz7yqEEtfgHcEBs
K9KcStQGGZRfmWU07hPXHnFz+5gTXqzCE2PBMlRgVUYJiA25mJPXfB00gDvGhtYa
+mENwM9Bq1B9YYLyLjRtUz8cyGsdyTIG/bBM/Q9jcV8JGqMU/UjAdh1pFyTnnHEl
Y59Npi7F87ZqYYJEHJM2LGD+le8VsHjgeWX2CJQko7klXvcizuZvUEDTjHaQcs2J
+kPgfyMIOY1DMJ21NxOJ2xPRC/wAh/hzSBRVtoAnyuxtkZ4VjIOh
-----END CERTIFICATE-----"
                .as_bytes(),)
        .unwrap();
    assert_eq!(certs,(1,0));

    let driver = client::Connector::new()
        .connector(RustlsConnector::new(Arc::new(tlsconfig)))
        .hostname("...the-hub-namespace...servicebus.windows.net")
        .connect_sasl(
            "...the-hub-namespace....servicebus.windows.net:5671",SaslAuth {
                authz_id: "".into(),authn_id: "...the-policy...".into(),password: "...the-key...".into(),},)
        .await
        .unwrap();
    let sink = driver.sink();

    ntex::rt::spawn(driver.start_default());

    let mut session = sink.open_session().await.unwrap();

    let mut links = vec![];

    // I have 32 partitions
    for i in 0..32u8 {
        let link = session
            .build_receiver_link(
                format!("mylink{}",i),format!("...the-hub.../ConsumerGroups/...the-consumer-group.../Partitions/{}",)
            .max_message_size(65535)
.property(
                Symbol::from_slice("com.microsoft:entity-type"),Some("8".into()),)
            .property(
                Symbol::from_slice("apache.org:selector-filter:string"),Some(Variant::Described((
                    Descriptor::Symbol(Symbol::from_slice("apache.org:selector-filter:string")),Box::new(Variant::from("amqp.annotation.x-opt-offset > '@latest'")),))),)
            .open()
            .await
            .unwrap();
        link.set_link_credit(20);
        links.push(link);
    }

    let mut links = futures::stream::select_all(links);

    use futures::StreamExt;
    while let Some(msg) = links.next().await {
        eprintln!("Message: {:#?}",msg);
    }

    Ok(())
}

我不知道在哪里设置集线器和消费者组,更不用说分区号了。我如何实际接收消息或对消息做出反应?我如何处理我的偏移量?

更新! @JesseSquire 帮助找出了合适的资源路径。更多的研究终于揭示了 ReceiverLink 上的 Stream 实现——我在看哪里?所以现在我实际上收到了一些消息。上面的一些问题不存在。

当我运行这段代码时,我得到了输出

[2021-06-22T18:21:49Z TRACE ntex::connect::resolve] DNS resolver: resolving host "...the-hub-namespace....servicebus.windows.net:5671"
[2021-06-22T18:21:49Z TRACE ntex::connect::resolve] DNS resolver: host "...the-hub-namespace....servicebus.windows.net:5671" resolved to [13.69.64.2:5671]
[2021-06-22T18:21:49Z TRACE ntex::connect::service] TCP connector - connecting to "...the-hub-namespace....servicebus.windows.net:5671" port:5671
[2021-06-22T18:21:49Z TRACE ntex::connect::service] TCP connector - successfully connected to connecting to "...the-hub-namespace....servicebus.windows.net:5671" - Ok(13.69.64.2:5671)
[2021-06-22T18:21:49Z TRACE ntex::connect::rustls] SSL Handshake start for: "...the-hub-namespace....servicebus.windows.net"
[2021-06-22T18:21:49Z TRACE ntex::connect::rustls] SSL Handshake success: DNSNameRef("...the-hub-namespace....servicebus.windows.net")
[2021-06-22T18:21:49Z TRACE ntex_amqp::client::connector] Negotiation client protocol id: AmqpSasl
[2021-06-22T18:21:49Z TRACE ntex_amqp::client::connector] Negotiation client protocol id: Amqp
[2021-06-22T18:21:50Z TRACE ntex_amqp::client::connector] Open client amqp connection: Open { container_id: "42b4f39ce51f46bf85cf8631f64d8cca",hostname: Some("...the-hub-namespace....servicebus.windows.net"),max_frame_size: 65535,channel_max: 1024,idle_time_out: Some(120000),outgoing_locales: None,incoming_locales: None,offered_capabilities: None,desired_capabilities: None,properties: None }
[2021-06-22T18:21:50Z TRACE ntex_amqp::client::connector] Open confirmed: Open { container_id: "f11d7639f6454b819bbf7b1e1150c101_G6",hostname: None,properties: None }
[2021-06-22T18:21:50Z TRACE ntex_amqp::connection] Session opened: local 0 remote 0
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink0" 0 -> 0
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink1" 1 -> 1
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink2" 2 -> 2
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink3" 3 -> 3
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink4" 4 -> 4
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink5" 5 -> 5
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink6" 6 -> 6
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink7" 7 -> 7
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink8" 8 -> 8
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink9" 9 -> 9
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink10" 10 -> 10
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink11" 11 -> 11
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink12" 12 -> 12
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink13" 13 -> 13
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink14" 14 -> 14
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink15" 15 -> 15
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink16" 16 -> 16
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink17" 17 -> 17
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink18" 18 -> 18
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink19" 19 -> 19
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink20" 20 -> 20
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink21" 21 -> 21
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink22" 22 -> 22
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink23" 23 -> 23
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink24" 24 -> 24
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink25" 25 -> 25
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink26" 26 -> 26
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink27" 27 -> 27
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink28" 28 -> 28
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink29" 29 -> 29
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink30" 30 -> 30
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink31" 31 -> 31
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
Message: Ok(
    Transfer {
        handle: 0,delivery_id: Some(
            0,),delivery_tag: Some(
            b"",message_format: Some(
            0,settled: Some(
            true,more: false,rcv_settle_mode: None,state: None,resume: false,aborted: false,batchable: true,body: Some(
            Data(
                b"\0Sr\xc1I\x06\xa3\x15x-opt-sequence-numberU\0\xa3\x0cx-opt-offset\xa1\x010\xa3\x13x-opt-enqueued-time\x83\0\0\x01z4\xcd\xfc \0Su\xa0\x14s6df54s65df4s6d5f4sf",)
...
[2021-06-22T18:23:37Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:24:09Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
Message: Ok(
    Transfer {
        handle: 13,delivery_id: Some(
            27,body: Some(
            Data(
                b"\0Sr\xc1J\x06\xa3\x15x-opt-sequence-numberU\x01\xa3\x0cx-opt-offset\xa1\x0256\xa3\x13x-opt-enqueued-time\x83\0\0\x01z4\xf7\"\xfb\0Su\xa0\x15999999999999999999999",)
[2021-06-22T18:24:50Z TRACE ntex_amqp::session] Session received credit None. window: 4294967295,pending: 0
[2021-06-22T18:24:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:26:35Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task
[2021-06-22T18:27:50Z TRACE ntex_amqp::session] Session received credit None. window: 4294967295,pending: 0
[2021-06-22T18:27:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame,register dispatch task


好的,主要目标实现了:我可以使用 AMQP 在 Rust 中接收事件中心消息。尽管如此,它还有很多值得期待的地方:

  1. 过滤似乎没有任何效果,我该如何管理偏移量?
  2. 更可靠、维护良好、经过实战考验的板条箱?
  3. 文档、示例?
  4. 至少交付一次(目前看起来最多一次)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)