Apache Curator + Spring Boot:简单的观察者模式示例

问题描述

我正在尝试启动一个基本的项目结构,其中多个 Spring Boot 应用程序将使用 apache curator 共享资源。

我遵循文档中指定的指南,但更改节点不会触发任何事件

请,任何帮助将不胜感激

pom.xml

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

docker-compose.yaml

version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

创作者

package com.training.zoo.sss;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.out;

@Service
public class Client {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Client() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
        if (stat1 == null) {
            client.create().forPath(ZK_PATH,"sometdata".getBytes());
        }

        byte[] bytes = client.getData().forPath(ZK_PATH);
        out.println(new String(bytes,StandardCharsets.UTF_8));

        // Update value every half second
        final AtomicInteger i = new AtomicInteger(0);
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
        exec.scheduleAtFixedRate(new Runnable(){
            @Override
            public void run(){
                i.set(i.get()+1);
                System.out.println(i);
                try {
                    client.setData().forPath(ZK_PATH,("init_" + i ).getBytes());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },500,TimeUnit.MILLISECONDS);
    }
}

听众

package com.training.bookstore.request;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;

@Service
public class Watcher2 {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Watcher2() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        PathChildrenCache watcher = new PathChildrenCache(
                client,ZK_PATH,true    // if cache data
        );

        watcher.getListenable().addListener((client1,event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ",path=[" + data.getPath() + "]"
                        + ",data=[" + new String(data.getData()) + "]"
                        + ",stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(PathChildrenCache.StartMode.NORMAL);
        System.out.println("Register zk watcher successfully!");
    }
}

谢谢

解决方法

所以是的,类名 PathChildrenCache 是一个死的赠品。 听起来很奇怪https://www.youtube.com/watch?v=nZcRU0Op5P4

如果我要发布到 /path1/path2 我正在听路径 /path1/path2 我真的在听path1还是path2? 剧透警告:您正在收听 path2,这是一个文件夹,而不是您认为自己创建的节点

解决方案是 如果生产者在指定路径上生产

    String connectionInfo = "127.0.0.1:2181";
    String PATH = "/someapp/somemodule/whatever";

在 Watcher 类中将路径设置为该节点的“父”

    String connectionInfo = "127.0.0.1:2181";
    String PATH = "/someapp/somemodule";

如果您需要监听生产者路径的子节点/子文件夹, 而不是使用 PathChildrenCache 使用 TreeCache

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...