无法通过sasl.jaas.config获取testcontainers Kafka来测试ACL是否正常工作

问题描述

我正在尝试利用testcontainers在某些自动化单元测试中本地测试Kafka。我在测试授权时遇到问题。

我的目标是测试

(1)如果此测试容器中没有ACL,则不允许KafkaProducer写入(目前,即使没有正确配置生产者,只要创建了ACL,它都可以发送到主题-我认为将allow.everyone.if.no.acl.found的kafka env变量设置为false可以解决问题-但似乎并非如此)

(2)测试KafkaProducer是否使用了正确的sasl.jaas.config(即,错误的apiKey和pasword),即使设置了ACL,它也被拒绝访问测试主题对于所有校长。

下面是我的代码。我可以让它“工作”,但是测试我无法弄清楚的上述两种情况。我认为我可能实际上并不是在创建ACL,因为在创建ACL之后添加一行({adminClient.describeAcls(AclBindingFilter.ANY).values().get();时出现No Authorizer is configured on the broker错误)->查看与this类似的帖子认为这意味着实际上没有创建ACL绑定。

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.serialization.StringSerializer;

        String topicName = "this-is-a-topic";
        String confluentVersion = "5.5.1";
        network = Network.newNetwork();
        String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required %s=\"%s\" %s=\"%s\";";
        String jaasConfig = String.format(jaasTemplate,"username","apiKey","password","apiPassword");
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + confluentVersion))
                .withNetwork(network)
                .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE","false")
                .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND","false")
                .withEnv("KAFKA_SUPER_USERS","User:OnlySuperUser")
                .withEnv("KAFKA_SASL_MECHANISM","PLAIN")
                .withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM","http")
                .withEnv("KAFKA_SASL_JAAS_CONFIG",jaasConfig);

        kafka.start();
        schemaRegistryContainer = new SchemaRegistryContainer(confluentVersion).withKafka(kafka);
        schemaRegistryContainer.start();

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka.getBootstrapServers());
        properties.put("input.topic.name",topicName);
        properties.put("input.topic.partitions","1");
        adminClient = KafkaAdminClient.create(properties);
        AclBinding ACL = new AclBinding(new ResourcePattern(ResourceType.TOPIC,topicName,PatternType.LITERAL),new AccessControlEntry( "User:*","*",AclOperation.WRITE,AclPermissionType.ALLOW));
        var acls = adminClient.createAcls(List.of(ACL)).values();


        List<NewTopic> topics = new ArrayList<>();
        topics.add(
                new NewTopic(topicName,Integer.parseInt(properties.getProperty("input.topic.partitions")),Short.parseShort(properties.getProperty("input.topic.replication.factor")))
        );
        adminClient.createTopics(topics);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

        props.put("input.topic.name",topicName);
        props.put("security.protocol","PLAINTEXT");
        props.put("input.topic.partitions","1");
        props.put("input.topic.replication.factor","1");
        props.put("metadata.fetch.timeout.ms","10000");
        props.put("sasl.jaas.config",jaasConfig);

        producer = new KafkaProducer<>(props);

        String key = "testContainers";
        String value = "AreAwesome";
        ProducerRecord<String,String> record = new ProducerRecord<>(
                        (String) props.get("input.topic.name"),key,value);
        try {
             RecordMetadata o = (RecordMetadata) producer.send(record).get();
             System.out.println(o.toString());
        } catch (Exception e) {
             e.printStackTrace();
        }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...