ZeroMQ 与 Spring (spring-integration-zeromq)

问题描述

我正在使用 spring-integration-zeromq 并且我正在尝试使用身份验证设置进行设置。

    @Bean
    ZeroMqChannel zeroMqPubSubChannel(ZContext context,ObjectMapper objectMapper) {
        ZeroMqChannel channel = new ZeroMqChannel(context,true);
        channel.setConnectUrl("tcp://localhost:6001:6002");
        channel.setConsumeDelay(Duration.ofMillis(100));
        channel.setMessageConverter(new GenericmessageConverter());
        channel.setSendSocketConfigurer(socket -> {
            socket.setZAPDomain("global".getBytes());
            socket.setCurveServer(true);
            socket.setCurvePublicKey("my_public_key".getBytes());
            socket.setCurveSecretKey("my_secret_key".getBytes());
        });
        EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
        channel.setMessageMapper(mapper);
        channel.afterPropertiesSet();

        channel.subscribe(m -> System.out.println(m));
        return channel;
    }

然而,似乎忽略了 setSendSocketConfigurer 的结果。 在 org.springframework.integration.zeromq.channel.ZeroMqChannel 中,sendSocketConnectionConfigurer 被初始化为一个空的 lambda 并因此传递给 prepareSendSocketMono;所以我调用 setSendSocketConfigurer 因此似乎没有效果,因为它只替换 ZeroMqChannel 实例中的一个属性,而不是应用于已经创建的套接字单声道。如何正确设置身份验证?我错过了什么吗?


UPD1

在 Artem Bilan 提供的修复之后,套接配置器似乎已开始应用于通道,但通信停止工作。我已经应用了建议并尝试设置 ZeroMqProxy,希望它能为我提供一个解决方法,但仍然没有成功:即使我在同一配置中的日志订阅也没有通过身份验证(尽管如果我删除套接配置器,它仍然可以工作)电话)

@Configuration
public class ZeroMQConfig {
    @Bean
    ZeroMqProxy zeroMqProxy(ZContext context,@Value("${zmq.channel.port.frontend}") int frontendPort,@Value("${zmq.channel.port.backend}") int backendPort) {
        ZeroMqProxy proxy = new ZeroMqProxy(context,ZeroMqProxy.Type.SUB_PUB);
        proxy.setExposeCaptureSocket(true);
        proxy.setFrontendPort(frontendPort);
        proxy.setBackendPort(backendPort);
        ZCert cert = new ZCert();
        proxy.setFrontendSocketConfigurer(socket -> {
            socket.setCurvePublicKey(cert.getPublicKey());
            socket.setCurveSecretKey(cert.getSecretKey());
            socket.setCurveServerKey(Z85.decode("my_server_public_key"));
        });
        proxy.setBackendSocketConfigurer(socket -> {
            socket.setCurvePublicKey(cert.getPublicKey());
            socket.setCurveSecretKey(cert.getSecretKey());
            socket.setCurveServerKey(Z85.decode("my_server_public_key"));
        });
        return proxy;
    }

    @Bean
    public ZContext zContext() {
        ZContext context = new ZContext();
        ZAuth auth = new ZAuth(context);
        auth.configureCurve(ZAuth.CURVE_ALLOW_ANY);
        auth.setVerbose(true);
        return context;
    }

    @Bean(name = "zeroMqPubChannel")
    ZeroMqChannel zeroMqPubChannel(ZContext context,ObjectMapper objectMapper,ZeroMqProxy proxy){
        ZeroMqChannel channel = new ZeroMqChannel(context,true);
        channel.setZeroMqProxy(proxy);
        channel.setConsumeDelay(Duration.ofMillis(100));
        channel.setMessageConverter(new GenericmessageConverter());
        EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
        channel.setMessageMapper(mapper);
        return channel;
    }

    @Bean
    @ServiceActivator(inputChannel = "zeroMqPubChannel")
    public MessageHandler subscribe() {
        return message -> System.out.println(message);
    }
}

解决方法

是的...我明白你的意思。这是一个错误:我们必须推迟 his.sendSocketConfigurer 的使用,直到真正与 socket 发生交互。我会尽快解决这个问题。

现在关于您的配置的一些评论:

您不得自己调用 afterPropertiesSet()。让 Spring 应用程序上下文为您管理其回调!

您不得订阅其 bean 定义中的 MessageChannel。而是考虑使用 @ServiceActivator(inputChannel = "zeroMqPubSubChannel")。在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#service-activator

不幸的是,无法将该自定义传递到内部 ZMQ.Socket 实例中...

更新

在 ZeroMQ 中使用 Curve auth 进行的工作测试:

@Test
void testPubSubWithCurve() throws InterruptedException {
    ZContext CONTEXT = new ZContext();
    new ZAuth(CONTEXT).configureCurve(ZAuth.CURVE_ALLOW_ANY).setVerbose(true);

    ZMQ.Curve.KeyPair frontendKeyPair = ZMQ.Curve.generateKeyPair();
    ZMQ.Curve.KeyPair backendKeyPair = ZMQ.Curve.generateKeyPair();

    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT,ZeroMqProxy.Type.SUB_PUB);
    proxy.setBeanName("subPubCurveProxy");
    proxy.setFrontendSocketConfigurer(socket -> {
        socket.setZAPDomain("global".getBytes());
        socket.setCurveServer(true);
        socket.setCurvePublicKey(frontendKeyPair.publicKey.getBytes());
        socket.setCurveSecretKey(frontendKeyPair.secretKey.getBytes());
    });
    proxy.setBackendSocketConfigurer(socket -> {
        socket.setZAPDomain("global".getBytes());
        socket.setCurveServer(true);
        socket.setCurvePublicKey(backendKeyPair.publicKey.getBytes());
        socket.setCurveSecretKey(backendKeyPair.secretKey.getBytes());
    });
    proxy.afterPropertiesSet();
    proxy.start();

    ZeroMqChannel channel = new ZeroMqChannel(CONTEXT,true);
    channel.setZeroMqProxy(proxy);
    channel.setBeanName("testChannelWithCurve");
    channel.setSendSocketConfigurer(socket -> {
        ZCert clientCert = new ZCert();
        socket.setCurvePublicKey(clientCert.getPublicKey());
        socket.setCurveSecretKey(clientCert.getSecretKey());
        socket.setCurveServerKey(frontendKeyPair.publicKey.getBytes());
    });
    channel.setSubscribeSocketConfigurer(socket -> {
                ZCert clientCert = new ZCert();
                socket.setCurvePublicKey(clientCert.getPublicKey());
                socket.setCurveSecretKey(clientCert.getSecretKey());
                socket.setCurveServerKey(backendKeyPair.publicKey.getBytes());
            }
    );
    channel.setConsumeDelay(Duration.ofMillis(10));
    channel.afterPropertiesSet();

    BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();

    channel.subscribe(received::offer);
    channel.subscribe(received::offer);

    await().until(() -> proxy.getBackendPort() > 0);

    // Give it some time to connect and subscribe
    Thread.sleep(1000);

    GenericMessage<String> testMessage = new GenericMessage<>("test1");
    assertThat(channel.send(testMessage)).isTrue();

    Message<?> message = received.poll(10,TimeUnit.SECONDS);
    assertThat(message).isNotNull().isEqualTo(testMessage);
    message = received.poll(10,TimeUnit.SECONDS);
    assertThat(message).isNotNull().isEqualTo(testMessage);

    channel.destroy();
    proxy.stop();
    CONTEXT.close();
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...