Hystrix传递ThreadLocal范围对象的问题(最为细致的分析)

news2025/1/12 3:53:34

场景

在springcloud微服务体系下,从网关层开始要在request请求头放置一些重要参数,比如traceId,并要求在fegin之间的调用时,也能够一直传递下去,由于实际项目使用中,都是fegin集成了hystrix一起配合使用的,而hystrix有两种模式,一种信号量,一种线程池,我们业务中需要使用线程池模式,而且hystrix也是推荐这种。

问题

使用线程池模式就会存在问题,因为Tomcat中的HttpServletRequest是会复用的,当请求从发送到结束后此request就会被回收,如果在此开启线程就会出现获取request中参数为null的问题,hystrix的线程池同样会遇到此问题。详细的request与线程池的关系查看这篇文章,分析的很全面 千万不要把Request传递到异步线程里面!有坑!

思路

我们可以自定义线程池来解决,先从官网的github入手,有没有提供类似的方案 hystrix官方wiki

重点是HystrixConcurrencyStrategygetThreadPool()wrapCallable(),尤其是wrapCallable()正是我们想实现的功能,那么到底具体怎么使用呢。我们需要从源码来入手

寻找关键点

因为从fegin动态代理生成的,所以直接从HystrixInvocationHandlerinvoke入手,来查找关键点就是线程池的创建

public Object invoke(final Object proxy, final Method method, final Object[] args)
    throws Throwable {
  
  HystrixCommand<Object> hystrixCommand =
      new HystrixCommand<Object>(setterMethodMap.get(method)) {
        /**
         * 省略
         * */
      };

  return hystrixCommand.execute();
}

HystrixCommand是核心的执行类,继续分析
HystrixCommand构造方法

protected HystrixCommand(Setter setter) {
    // use 'null' to specify use the default
    this(setter.groupKey, setter.commandKey, setter.threadPoolKey, null, null, setter.commandPropertiesDefaults, setter.threadPoolPropertiesDefaults, null, null, null, null, null);
}

注意,第五个参数是HystrixThreadPool类型,这里传入的是null,

HystrixCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
    super(group, key, threadPoolKey, circuitBreaker, threadPool, commandPropertiesDefaults, threadPoolPropertiesDefaults, metrics, fallbackSemaphore, executionSemaphore, propertiesStrategy, executionHook);
}

调用父类AbstractCommand的构造方法,threadpool传入的依然是null
AbstractCommand

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    this.commandGroup = initGroupKey(group);
    this.commandKey = initCommandKey(key, getClass());
    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    //这里就是在创建hystrix的threadpool,入参依旧为null
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

    //Strategies from plugins
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);

    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;

    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);就是真正创建线程池的方法


private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
    //fromConstructor为null,进入if逻辑
    if (fromConstructor == null) {
        // get the default implementation of HystrixThreadPool
        // 官方注释直接说明了这里就是实现自己线程池的关键
        return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
    } else {
        return fromConstructor;
    }
}

HystrixThreadPool.Factory#getInstance

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // 获取要使用的键,而不是使用对象本身,这样如果人们忘记实现equals/hashcode,事情仍然可以工作
    // 这里是@FeignClient的value值
    String key = threadPoolKey.name();

    // 查找缓存,根据线程池的key,查找对应的线程池
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            //第一次创建缓存中肯定没有,这里进行创建
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

下面线程池的真正创建逻辑了
HystrixThreadPoolDefault构造方法

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
    //获取线程池的相关参数
    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
    //从HystrixPlugins.getInstance()获取一个HystrixConcurrencyStrategy类型的对象
    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    this.queueSize = properties.maxQueueSize().get();

    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
            //这里通过concurrencyStrategy.getThreadPool的这个操作去创建线程池
            concurrencyStrategy.getThreadPool(threadPoolKey, properties),
            properties);
    this.threadPool = this.metrics.getThreadPool();
    this.queue = this.threadPool.getQueue();

    /* strategy: HystrixMetricsPublisherThreadPool */
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
  • HystrixPlugins.getInstance().getConcurrencyStrategy()得到的HystrixConcurrencyStrategy就是在开始时提到的官网提供的插件,可见此方法是特别的重要
  • concurrencyStrategy.getThreadPool(threadPoolKey, properties)就是真正创建线程池的逻辑

HystrixPlugins.getInstance().getConcurrencyStrategy()

public HystrixConcurrencyStrategy getConcurrencyStrategy() {
    if (concurrencyStrategy.get() == null) {
        // check for an implementation from Archaius first
        //获取自定义的HystrixConcurrencyStrategy如果找到则设置,否则设置为HystrixConcurrencyStrategyDefault默认
        Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class);
        if (impl == null) {
            // nothing set via Archaius so initialize with default
            concurrencyStrategy.compareAndSet(null, HystrixConcurrencyStrategyDefault.getInstance());
            // we don't return from here but call get() again in case of thread-race so the winner will always get returned
        } else {
            // we received an implementation from Archaius so use it
            concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl);
        }
    }
    return concurrencyStrategy.get();
}

下面进入getPluginImplementation(HystrixConcurrencyStrategy.class)看看是怎么获取自定义HystrixConcurrencyStrategy
HystrixPlugins#getPluginImplementation

private <T> T getPluginImplementation(Class<T> pluginClass) {
        //从一个动态属性中获取,如果集成了Netflix Archaius就可以动态获取属性,类似于一个配置中心,所以这个不是我们想要的
        T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
        if (p != null) return p;
        //利用spi机制进行查找
        return findService(pluginClass, classLoader);
    }
private static <T> T findService(
        Class<T> spi, 
        ClassLoader classLoader) throws ServiceConfigurationError {

    ServiceLoader<T> sl = ServiceLoader.load(spi,
            classLoader);
    for (T s : sl) {
        if (s != null)
            return s;
    }
    return null;
}
  • 获取自定义的HystrixConcurrencyStrategy,通过spi机制进行查找
  • 如果找不到则设置为默认的HystrixConcurrencyStrategyDefault

到这里我们知道了,肯定是要在此方法中通过spi机制来实现我们自定义的HystrixConcurrencyStrategy从而的得到自己定义的线程池或者对线程进行包装,但我们先接着分析,如果是默认的会怎么执行。

获取到了默认的HystrixConcurrencyStrategy也就是HystrixConcurrencyStrategyDefault后,接下来就是获取线程池了

concurrencyStrategy.getThreadPool

HystrixConcurrencyStrategyDefault继承了HystrixConcurrencyStrategygetThreadPool是在HystrixConcurrencyStrategy中执行的

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

    final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    final int dynamicCoreSize = threadPoolProperties.coreSize().get();
    final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
    final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
    final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

    if (allowMaximumSizeToDivergeFromCoreSize) {
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

到这里知道了默认就是创建的JDK的线程池

方案

我们知道了要继承HystrixConcurrencyStrategy并利用spi机制来实现自定义,看下结构
HystrixConcurrencyStrategy

public abstract class HystrixConcurrencyStrategy {

    private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class);

    /**
     * Factory method to provide {@link ThreadPoolExecutor} instances as desired.
     * <p>
     * Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
     * {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Implementation using standard java.util.concurrent.ThreadPoolExecutor
     * 
     * @param threadPoolKey
     *            {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
     * @param corePoolSize
     *            Core number of threads requested via properties (or system default if no properties set).
     * @param maximumPoolSize
     *            Max number of threads requested via properties (or system default if no properties set).
     * @param keepAliveTime
     *            Keep-alive time for threads requested via properties (or system default if no properties set).
     * @param unit
     *            {@link TimeUnit} corresponding with keepAliveTime
     * @param workQueue
     *            {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
     * @return instance of {@link ThreadPoolExecutor}
     */
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final int dynamicCoreSize = corePoolSize.get();
        final int dynamicMaximumSize = maximumPoolSize.get();

        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
        }
    }

    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

    private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            return new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            return PlatformSpecific.getAppEngineThreadFactory();
        }
    }

    /**
     * Factory method to provide instance of {@code BlockingQueue<Runnable>} used for each {@link ThreadPoolExecutor} as constructed in {@link #getThreadPool}.
     * <p>
     * Note: The maxQueueSize value is provided so any type of queue can be used but typically an implementation such as {@link SynchronousQueue} without a queue (just a handoff) is preferred as
     * queueing is an anti-pattern to be purposefully avoided for latency tolerance reasons.
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Implementation returns {@link SynchronousQueue} when maxQueueSize <= 0 or {@link LinkedBlockingQueue} when maxQueueSize > 0.
     * 
     * @param maxQueueSize
     *            The max size of the queue requested via properties (or system default if no properties set).
     * @return instance of {@code BlockingQueue<Runnable>}
     */
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        /*
         * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
         * <p>
         * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
         * <p>
         * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
         * and rejecting is the preferred solution.
         */
        if (maxQueueSize <= 0) {
            return new SynchronousQueue<Runnable>();
        } else {
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }

    /**
     * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
     * <p>
     * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
     * <p>
     * <b>Default Implementation</b>
     * <p>
     * Pass-thru that does no wrapping.
     * 
     * @param callable
     *            {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
     * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
     */
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

    /**
     * Factory method to return an implementation of {@link HystrixRequestVariable} that behaves like a {@link ThreadLocal} except that it
     * is scoped to a request instead of a thread.
     * <p>
     * For example, if a request starts with an HTTP request and ends with the HTTP response, then {@link HystrixRequestVariable} should
     * be initialized at the beginning, available on any and all threads spawned during the request and then cleaned up once the HTTP request is completed.
     * <p>
     * If this method is implemented it is generally necessary to also implemented {@link #wrapCallable(Callable)} in order to copy state
     * from parent to child thread.
     * 
     * @param rv
     *            {@link HystrixRequestVariableLifecycle} with lifecycle implementations from Hystrix
     * @return {@code HystrixRequestVariable<T>}
     */
    public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
        return new HystrixLifecycleForwardingRequestVariable<T>(rv);
    }
    
}

其中wrapCallable方法就是我们想要实现的,官方注释说的很详细了,意思是提供在执行前包装可调用对象的机会。这可用于注入其他行为,例如复制线程状态(例如 ThreadLocal),简单看一下wrapCallable是怎么被调用的

HystrixContexSchedulerAction

public class HystrixContexSchedulerAction implements Action0 {

    private final Action0 actual;
    private final HystrixRequestContext parentThreadState;
    private final Callable<Void> c;

    public HystrixContexSchedulerAction(Action0 action) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), action);
    }

    public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
        this.actual = action;
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();

        this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
                try {
                    // set the state of this thread to that of its parent
                    HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                    // execute actual Action0 with the state of the parent
                    actual.call();
                    return null;
                } finally {
                    // restore this thread back to its original state
                    HystrixRequestContext.setContextOnCurrentThread(existingState);
                }
            }
        });
    }

    @Override
    public void call() {
        try {
            c.call();
        } catch (Exception e) {
            throw new RuntimeException("Failed executing wrapped Action0", e);
        }
    }

}

HystrixContextScheduler.HystrixContextSchedulerWorker#schedule(rx.functions.Action0)

public Subscription schedule(Action0 action) {
    if (threadPool != null) {
        if (!threadPool.isQueueSpaceAvailable()) {
            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
  • 调用concurrencyStrategy.wrapCallable来对wrapCallable进行包装,也就是我们需要做的就是这步
  • 当hystrix执行调用,就是执行包装后的wrapCallable

实现自己的concurrencyStrategy对wrapCallable进行包装

spi机制

spi文件

ExtraHystrixConcurrencyStrategy

@Log4j2
public class ExtraHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

spring容器启动就进行设置

这里是参考sleuth的原理org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy

@Configuration(proxyBeanMethods = false)
public class ExtraHystrixAutoConfiguration {

   @Bean
   public ExtraHystrixConcurrencyStrategy extraHystrixConcurrencyStrategy(){
       return new ExtraHystrixConcurrencyStrategy();
   }
}
public class ExtraHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    private HystrixConcurrencyStrategy delegate;

    public ExtraHystrixConcurrencyStrategy() {
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof ExtraHystrixConcurrencyStrategy) {
                // Welcome to singleton hell...
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
                    .getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                    propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                    .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        }
        catch (Exception e) {
            log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
                                                 HystrixMetricsPublisher metricsPublisher,
                                                 HystrixPropertiesStrategy propertiesStrategy) {
        if (log.isDebugEnabled()) {
            log.debug("Current Hystrix plugins configuration is ["
                    + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
                    + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
        }
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixProperty<Integer> corePoolSize,
                                            HystrixProperty<Integer> maximumPoolSize,
                                            HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }

    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(
            HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            } finally {
                MDC.clear();
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

RequestContextHolder本质就是一个ThreadLocal,但不建议将request放进入,可以替换为真正ThreadLocal来实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/436282.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cloud-canal的部署使用

一&#xff0c;官网参考&#xff1a; https://www.clougence.com/ https://www.clougence.com/cc-doc/quick/quick_start 二&#xff0c;点击下载私有部署版 返回数据&#xff1a; 版本号: 2.5.0.7 MD5值: 18e2502xxxxxxx 下载地址: https://tgzdownload.clougence.com/lates…

华为OD机试(Java),分班

一、题目描述 幼儿园两个班的小朋友在排队时混在了一起&#xff0c;每位小朋友都知道自己是否与前面一位小朋友是否同班&#xff0c;请你帮忙把同班的小朋友找出来。 小朋友的编号为整数&#xff0c;与前一位小朋友同班用Y表示&#xff0c;不同班用N表示。 二、输入描述 输…

PYQT5学习笔记01——PYQT5初体验以及PYQT5程序基本结构分析

一、PYQT5初体验 我们首先用代码编写一个窗口&#xff0c;窗口里面有一个标签控件&#xff0c;标签内的文本是 Hello World&#xff0c;代码如下&#xff1a; # -*- coding: UTF-8 -*- # 导入需要的包 from PyQt5.Qt import * import sys# 创建应用程序对象 app QApplicatio…

【c++初阶】:

c入门 一.概念二.使用三.应用四.常引用五.引用与指针 一.概念 c语言中我们常用指针找地址&#xff0c;但在c中&#xff0c;忽略了指针&#xff08;当然也可以使用指针&#xff09;。常用引用这个概念。 二.使用 可以看到这里的b和c本质上都是a&#xff0c;只是不同的称呼罢了。…

手把手教你将项目部署到服务器!

一、导入centos7虚拟机&#xff1a; 打开VMWare&#xff0c;点击“打开虚拟机”&#xff0c;选择centos7.ova之后&#xff0c;选择存储路径&#xff1a; 点击导入&#xff1a; 选择“不再显示此消息”&#xff0c;点击“重试”按钮&#xff1a; 点击“编辑虚拟机设置”&#x…

【数据结构】二叉树OJ题

&#x1f63d;PREFACE &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐ 评论&#x1f4dd; &#x1f4e2;系列专栏&#xff1a;数据结构 &#x1f50a;本专栏主要更新的是数据结构部分知识点 &#x1f4aa;种一棵树最好是十年前其次是现在 目录 1.单值二叉树 2.相同的树 …

Hadoop之Hive

文章目录 一、Hive简介1.1 Hive 基本概念1.2 Hive架构图1.3 Hive数据模型 二、Hive安装配置2.1 内嵌模式2.2 配置元数据到mysql2.3本地模式2.4远程模式2.5 Hive JDBC Hiverserver22.5.1远程模式下使用Beeline CLI2.5.2 DataGrip图形化客户端 2.6 Hive常见属性配置 一、Hive简介…

Vue基础入门(上)

<script src"https://unpkg.com/vuenext"></script> 从面向dom编程到面向数据编程 输入显示列表 const appVue.createApp({data(){return{inputValue:,list:[]}},methods:{handleAddItem(){this.list.push(this.inputValue);this.inputValue;}},templ…

(一) nvidia jetson orin nvcsi tegra-capture-vi camera相关内容梳理 之 vi相关代码分析

背景:对于nvidia 的jetson orin 的camera,其内部是如何实现的尼?硬件方面的pipeline是怎么关联的,其内部有哪些camera相关的modules?对于这些modules,软件上又是怎么去实现?设备树如何去抽象这些modules?分析完后,给我们一个camera sensor,如何进行bring up?本文将会…

什么是 AUTOSAR C++14?

总目录链接>> AutoSAR入门和实战系列总目录 总目录链接>> AutoSAR BSW高阶配置系列总目录 文章目录 什么是 AUTOSAR C14&#xff1f;AUTOSAR C14 规则和偏差静态分析工具可以完全支持自动 什么是 AUTOSAR C14&#xff1f; 它是 C 版本 14 (ISO/IEC 14882:2014…

Mac安装Stable Diffusion教程【超详细教程】附带安装包

Mac安装Stable Diffusion教程 本机配置Mac安装Stable Diffusion教程 配带官方说明重要注意事项安装所需文件已上传网盘自动安装新安装&#xff1a; 自动安装现有安装&#xff1a; 下载稳定扩散模型故障排除Web UI无法启动&#xff1a;性能不佳&#xff1a; 本机配置 电脑&…

DJ4-3 路由器的工作原理

目录 一、路由器的整体结构 二、输入端口的功能 1. 三大模块 2. 查找与转发模块 三、交换结构 1. 经内存的交换结构 2. 经总线的交换结构 3. 经交换矩阵交换结构 四、输出端口的功能 五、排队 1. 输入端口排队 2. 输出端口排队 一、路由器的整体结构 路由器的两个…

一秒钟给硬盘文件做个树状结构目录

一秒钟给硬盘文件做个树状结构目录 一、背景 对于长时间坐在电脑前的打工人来说&#xff0c;若没有养成良好文件分类习惯的话&#xff0c;年终整理电脑文件绝对是件头疼的事情。 给磁盘文件做个目录&#xff0c;一目了然文件都在哪里&#xff1f;想想都是件头疼的事情。 对于…

golang 实现 ldif 数据转成 json 初探

theme: Chinese-red 「这是我参与11月更文挑战的第 8 天&#xff0c;活动详情查看&#xff1a;2021最后一次更文挑战」 上一篇我们分享了如何将 ldif 格式的数据&#xff0c;转换成 json 数据的思路并画相应的简图 这一次&#xff0c;我们就来实现一下 实现方式如下&#xff…

P1829 [国家集训队]Crash的数字表格 / JZPTAB(莫比乌斯反演)

[国家集训队]Crash的数字表格 / JZPTAB 题目描述 今天的数学课上&#xff0c;Crash 小朋友学习了最小公倍数&#xff08;Least Common Multiple&#xff09;。对于两个正整数 a a a 和 b b b&#xff0c; lcm ( a , b ) \text{lcm}(a,b) lcm(a,b) 表示能同时整除 a a a 和…

『pyqt5 从0基础开始项目实战』10.日志记录 鼠标右键打开(保姆级图文)

目录 导包和框架代码实现右键功能实现日志展示弹窗编写一个日志文件用于测试日志展示完整代码main.pythreads.pydialog.py 总结 欢迎关注 『pyqt5 从0基础开始项目实战』 专栏&#xff0c;持续更新中 欢迎关注 『pyqt5 从0基础开始项目实战』 专栏&#xff0c;持续更新中 导包和…

Python常用练习小例子

Python常用练习小例子 1、输出九九乘法表 源码如下&#xff1a; # 九九乘法表 for i in range(1, 10):for j in range(1, i1):print({}x{}{}\t.format(i, j, i*j), end)print() # 换行&#xff0c;相当于print(end\n) 其中&#xff0c;rint({}x{}{}\t.format(i, j, i*j), e…

Kubespray v2.21.0 离线部署 Kubernetes v1.25.6 集群

文章目录 1. 前言2. 预备条件3. 配置代理4. 下载介质5. 初始化配置6. 安装部署工具6.1 配置 venv 部署环境6.2 配置容器部署环境 7. 配置互信8. 编写 inventory.ini9. 编写 offline.yml10. 部署 offline repo11. 部署 kubernetes 1. 前言 Kubespray 是 Kubernetes incubator 中…

【Python合集】程序员系列代码之“这么好的天气应该去放风筝,而不是在搬砖,好想去放风筝哦~”(附完整代码)

导语 ☽ ☽ ☽ ☽ ☽ ☽ 文案丨April 19th, 2023 ☆ ☽ ☽☽ ☽☽ ☽ 江滩边摇摇晃晃的风筝 是春日越冬归来的信号 风筝蹦蹦跳跳 看盎然春意四处热闹阿姨路过菜摊子 带把香椿回家炒蛋细子摘桑 被酸得直口水嗲嗲裹着棉袄 托起霸缸到处晒大阳妹子没管倒春寒 提前换上短…

HttpServletRequest

1、HttpServletRequest对象 在Servlet API中&#xff0c;定义了一个HttpServletRequest接口&#xff0c;它继承自ServletRequest接口&#xff0c;专门用于封装HTTP请求消息 1.1 获取请求行信息的相关方法 当访问Servlet时&#xff0c;请求消息的请求行中会包含请求方法、请求…