目录
- 🐬官网介绍
- 🐬负载均衡
- 🐠加权随机(random)
- 🐠平滑轮询(roundrobin)
- 🐠线性负载(lowerweight)
- 🐵其它
🐬官网介绍
官网资料,从1.3开始到3.1都是一样的,源码将以2.0.6版本为例,官网介绍如下:
🐬负载均衡
可以修改master.properties
,指定负载均衡算法:
MasterConfig.java
默认为线性负载lowerweight
:
HostManagerConfig.java
根据配置选择算法:
父类CommonHostManager
,获取有效的worker列表:
🐠加权随机(random)
RandomSelector
RandomHostManager
AbstractSelector
🐠平滑轮询(roundrobin)
用到了原子类
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
AtomicBoolean
/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent.atomic; import sun.misc.Unsafe; /** * A {@code boolean} value that may be updated atomically. See the * {@link java.util.concurrent.atomic} package specification for * description of the properties of atomic variables. An * {@code AtomicBoolean} is used in applications such as atomically * updated flags, and cannot be used as a replacement for a * {@link java.lang.Boolean}. * * @since 1.5 * @author Doug Lea */ public class AtomicBoolean implements java.io.Serializable { private static final long serialVersionUID = 4654671469794556979L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicBoolean.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; /** * Creates a new {@code AtomicBoolean} with the given initial value. * * @param initialValue the initial value */ public AtomicBoolean(boolean initialValue) { value = initialValue ? 1 : 0; } /** * Creates a new {@code AtomicBoolean} with initial value {@code false}. */ public AtomicBoolean() { } /** * Returns the current value. * * @return the current value */ public final boolean get() { return value != 0; } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(boolean expect, boolean update) { int e = expect ? 1 : 0; int u = update ? 1 : 0; return unsafe.compareAndSwapInt(this, valueOffset, e, u); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * <p><a href="package-summary.html#weakCompareAndSet">May fail * spuriously and does not provide ordering guarantees</a>, so is * only rarely an appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return {@code true} if successful */ public boolean weakCompareAndSet(boolean expect, boolean update) { int e = expect ? 1 : 0; int u = update ? 1 : 0; return unsafe.compareAndSwapInt(this, valueOffset, e, u); } /** * Unconditionally sets to the given value. * * @param newValue the new value */ public final void set(boolean newValue) { value = newValue ? 1 : 0; } /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */ public final void lazySet(boolean newValue) { int v = newValue ? 1 : 0; unsafe.putOrderedInt(this, valueOffset, v); } /** * Atomically sets to the given value and returns the previous value. * * @param newValue the new value * @return the previous value */ public final boolean getAndSet(boolean newValue) { boolean prev; do { prev = get(); } while (!compareAndSet(prev, newValue)); return prev; } /** * Returns the String representation of the current value. * @return the String representation of the current value */ public String toString() { return Boolean.toString(get()); } }
-
AtomicLong
/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent.atomic; import java.util.function.LongUnaryOperator; import java.util.function.LongBinaryOperator; import sun.misc.Unsafe; /** * A {@code long} value that may be updated atomically. See the * {@link java.util.concurrent.atomic} package specification for * description of the properties of atomic variables. An * {@code AtomicLong} is used in applications such as atomically * incremented sequence numbers, and cannot be used as a replacement * for a {@link java.lang.Long}. However, this class does extend * {@code Number} to allow uniform access by tools and utilities that * deal with numerically-based classes. * * @since 1.5 * @author Doug Lea */ public class AtomicLong extends Number implements java.io.Serializable { private static final long serialVersionUID = 1927816293512124184L; // setup to use Unsafe.compareAndSwapLong for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; /** * Records whether the underlying JVM supports lockless * compareAndSwap for longs. While the Unsafe.compareAndSwapLong * method works in either case, some constructions should be * handled at Java level to avoid locking user-visible locks. */ static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); /** * Returns whether underlying JVM supports lockless CompareAndSet * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS. */ private static native boolean VMSupportsCS8(); static { try { valueOffset = unsafe.objectFieldOffset (AtomicLong.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile long value; /** * Creates a new AtomicLong with the given initial value. * * @param initialValue the initial value */ public AtomicLong(long initialValue) { value = initialValue; } /** * Creates a new AtomicLong with initial value {@code 0}. */ public AtomicLong() { } /** * Gets the current value. * * @return the current value */ public final long get() { return value; } /** * Sets to the given value. * * @param newValue the new value */ public final void set(long newValue) { value = newValue; } /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */ public final void lazySet(long newValue) { unsafe.putOrderedLong(this, valueOffset, newValue); } /** * Atomically sets to the given value and returns the old value. * * @param newValue the new value * @return the previous value */ public final long getAndSet(long newValue) { return unsafe.getAndSetLong(this, valueOffset, newValue); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * <p><a href="package-summary.html#weakCompareAndSet">May fail * spuriously and does not provide ordering guarantees</a>, so is * only rarely an appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return {@code true} if successful */ public final boolean weakCompareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); } /** * Atomically increments by one the current value. * * @return the previous value */ public final long getAndIncrement() { return unsafe.getAndAddLong(this, valueOffset, 1L); } /** * Atomically decrements by one the current value. * * @return the previous value */ public final long getAndDecrement() { return unsafe.getAndAddLong(this, valueOffset, -1L); } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the previous value */ public final long getAndAdd(long delta) { return unsafe.getAndAddLong(this, valueOffset, delta); } /** * Atomically increments by one the current value. * * @return the updated value */ public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } /** * Atomically decrements by one the current value. * * @return the updated value */ public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the updated value */ public final long addAndGet(long delta) { return unsafe.getAndAddLong(this, valueOffset, delta) + delta; } /** * Atomically updates the current value with the results of * applying the given function, returning the previous value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the previous value * @since 1.8 */ public final long getAndUpdate(LongUnaryOperator updateFunction) { long prev, next; do { prev = get(); next = updateFunction.applyAsLong(prev); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function, returning the updated value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the updated value * @since 1.8 */ public final long updateAndGet(LongUnaryOperator updateFunction) { long prev, next; do { prev = get(); next = updateFunction.applyAsLong(prev); } while (!compareAndSet(prev, next)); return next; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the previous value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the previous value * @since 1.8 */ public final long getAndAccumulate(long x, LongBinaryOperator accumulatorFunction) { long prev, next; do { prev = get(); next = accumulatorFunction.applyAsLong(prev, x); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the updated value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the updated value * @since 1.8 */ public final long accumulateAndGet(long x, LongBinaryOperator accumulatorFunction) { long prev, next; do { prev = get(); next = accumulatorFunction.applyAsLong(prev, x); } while (!compareAndSet(prev, next)); return next; } /** * Returns the String representation of the current value. * @return the String representation of the current value */ public String toString() { return Long.toString(get()); } /** * Returns the value of this {@code AtomicLong} as an {@code int} * after a narrowing primitive conversion. * @jls 5.1.3 Narrowing Primitive Conversions */ public int intValue() { return (int)get(); } /** * Returns the value of this {@code AtomicLong} as a {@code long}. */ public long longValue() { return get(); } /** * Returns the value of this {@code AtomicLong} as a {@code float} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public float floatValue() { return (float)get(); } /** * Returns the value of this {@code AtomicLong} as a {@code double} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public double doubleValue() { return (double)get(); } }
真正的源码RoundRobinSelector
…神马玩意!!!!
🐠线性负载(lowerweight)
默认的负载均衡算法,估计也是真正使用的,另外两个就是凑数的,select方法都是单独写的
ExecutorDispatcher
:只有LowerWeightHostManager
重写了select(ExecutionContext context)
- LowerWeightRoundRobin
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; import com.google.common.collect.Lists; /** * lower weight round robin */ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> { /** * select * * @param sources sources * @return HostWeight */ @Override public HostWeight doSelect(Collection<HostWeight> sources) { double totalWeight = 0; double lowWeight = 0; HostWeight lowerNode = null; List<HostWeight> weights = canAssignTaskHost(sources); for (HostWeight hostWeight : weights) { totalWeight += hostWeight.getWeight(); hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { lowerNode = hostWeight; lowWeight = hostWeight.getCurrentWeight(); } } lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight); return lowerNode; } private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) { List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList()); if (!zeroWaitingTask.isEmpty()) { return zeroWaitingTask; } HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get(); List<HostWeight> waitingTask = Lists.newArrayList(hostWeight); List<HostWeight> equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount()) .collect(Collectors.toList()); if (!equalWaitingTask.isEmpty()) { waitingTask.addAll(equalWaitingTask); } return waitingTask; } }
🐵其它
@Bean注解官网
Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后将这个Bean对象放在自己的IOC容器中。
SpringIOC
容器管理一个或者多个bean,这些bean都需要在@Configuration注解下进行创建,在一个方法上使用@Bean注解就表明这个方法需要交给Spring进行管理。