Resilience4j 上下文传播器无法传播线程本地值

问题描述

我正在尝试将我的断路器代码从 Hystrix 迁移到 Resilience4j。通信是在两个应用程序之间进行的,其中一个应用程序包含 Java 代码本身中的所有弹性 4j 配置,第二个应用程序作为微服务直接使用它。

一个 RequestId 在微服务中生成并传播到工件上下文,并在该上下文中打印在日志中。使用 Hystrix,它运行得非常好,但自从我转向弹性后,我的请求 ID 为空。

以下是我的隔板和上下文传播器的配置:

ThreadPoolBulkheadConfig bulkheadConfig = ThreadPoolBulkheadConfig.custom()
            .maxThreadPoolSize(maxThreadPoolSize)
            .coreThreadPoolSize(coreThreadPoolSize)
            .queueCapacity(queueCapacity)
            .contextPropagator(new DummyContextPropagator())
            .build();

    // Bulk Head Registry
    ThreadPoolBulkheadRegistry bulkheadRegistry = ThreadPoolBulkheadRegistry.of(bulkheadConfig);

    // Create Bulk Head
    ThreadPoolBulkhead bulkhead = bulkheadRegistry.bulkhead(name,bulkheadConfig);

虚拟上下文传播器:

public class  DummyContextPropagator implements ContextPropagator {
private static final Logger log = LoggerFactory.getLogger( DummyContextPropagator.class);

@Override
public supplier<Optional<Object>> retrieve() {
    return () -> (Optional<Object>) get();
}

@Override
public Consumer<Optional<Object>> copy() {
    return (t) -> t.ifPresent(e -> {
        clear();
        put(e);
    });
}

@Override
public Consumer<Optional<Object>> clear() {
    return (t) ->  DummyContextHolder.clear();
}

public static class  DummyContextHolder {

    private static final ThreadLocal threadLocal = new ThreadLocal();

    private  DummyContextHolder() {
    }

    public static void put(Object context) {
        if (threadLocal.get() != null) {
            clear();
        }
        threadLocal.set(context);
    }

    public static void clear() {
        if (threadLocal.get() != null) {
            threadLocal.set(null);
            threadLocal.remove();
        }
    }

    public static Optional<Object> get() {
        return Optional.ofNullable(threadLocal.get());
    }
}
}

但是,似乎没有任何工作可以让我获得 RequestId。

我做的一切都正确还是有其他方法可以做到这一点?

解决方法

我认为当你在子线程中时,你想从父线程的线程局部获取参数,在 hystrix 中它使用命令模型来装饰可调用任务

在resilience4j中,我认为你可以像这样修复它:

@Resource
DispatcherServlet dispatcherServlet;

@PostConstruct
public void changeThreadLocalModel() {
    dispatcherServlet.setThreadContextInheritable(true);
}
,

我发现我的最后一个答案可能会导致一些问题,当您使用“dispatcherServlet.setThreadContextInheritable(true);”时

它可能会污染您的自定义线程池的线程本地映射;

所以这是我的最终决定,它只适用于resilience4j;

@Resource
Resilience4jBulkheadProvider resilience4jBulkheadProvider;

@PostConstruct
public void concurrentThreadContextStrategy() {
    ThreadPoolBulkheadConfig threadPoolBulkheadConfig = ThreadPoolBulkheadConfig.custom().contextPropagator(new CustomInheritContextPropagator()).build();
    resilience4jBulkheadProvider.configureDefault(id -> new Resilience4jBulkheadConfigurationBuilder()
            .bulkheadConfig(BulkheadConfig.ofDefaults()).threadPoolBulkheadConfig(threadPoolBulkheadConfig)
            .build());
}


private static class CustomInheritContextPropagator implements ContextPropagator<RequestAttributes> {

    @Override
    public Supplier<Optional<RequestAttributes>> retrieve() {
        //  give requestcontext to reference from threadlocal;
        //  this method call by web-container thread,such as tomcat,jetty,or undertow,depends on what you used;
        return () -> Optional.ofNullable(RequestContextHolder.getRequestAttributes());
    }

    @Override
    public Consumer<Optional<RequestAttributes>> copy() {
        // load requestcontex into real-call thread
        // this method call by resilience4j bulkhead thread;
        return requestAttributes -> requestAttributes.ifPresent(context -> {
            RequestContextHolder.resetRequestAttributes();
            RequestContextHolder.setRequestAttributes(context);
        });
    }

    @Override
    public Consumer<Optional<RequestAttributes>> clear() {
        // clean requestcontext finally ;
        // this method call by resilience4j bulkhead thread;
        return requestAttributes -> RequestContextHolder.resetRequestAttributes();
    }
}