并发编程 - Event Bus 设计模式

news2025/1/15 6:48:35

文章目录

  • Pre
  • 设计
  • Code
    • Bus接口
    • 自定义注解 @Subscribe
    • 同步EventBus
    • 异步EventBus
    • Subscriber注册表Registry
    • Event广播Dispatcher
  • 测试
    • 简单的Subscriber
    • 同步Event Bus
    • 异步Event Bus
  • 小结

在这里插入图片描述

Pre

我们在日常的工作中,都会使用到MQ这种组件, 某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息。

在这里插入图片描述

如图所示【消息中间件的消息订阅与发布】

消息中间件的核心作用是提供系统之间的异步消息处理机制。它可以在一个系统完成操作后,通过提交消息到消息中间件,触发其他依赖系统的后续处理,而不需要等待后续处理完全结束。

使用消息中间件的好处有:

  • 提高系统处理效率,系统之间可以异步并行处理
  • 降低系统耦合,通过消息进行解耦
  • 提高系统故障隔离能力,一个系统故障不会影响其他系统

今天我们来实现一个Java进程内部的消息中间件Event Bus,它可以用于进程内不同组件之间的异步消息通信。


设计

在这里插入图片描述

  • Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event
  • register方法用来注册Event接收者(Subscriber)接受响应事件
  • EventBus采用同步的方式推送Event
  • AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event
  • Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法用注解@Subscribe来标识。
  • Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber

在这里插入图片描述


Code

在这里插入图片描述

Bus接口

package com.artisan.busevent.intf;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * @desc: Bus接口定义了EventBus的所有使用方法
 */
public interface Bus {

    /**
     * 将某个对象注册到Bus上,从此之后该类就成为Subscriber了
     */
    void register(Object subscriber);

    /**
     * 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息
     */
    void unregister(Object subscriber);

    /**
     * 提交Event到默认的topic
     */
    void post(Object event);

    /**
     * 提交Event到指定的topic
     */
    void post(Object Event, String topic);

    /**
     * 关闭该bus
     */
    void close();

    /**
     * 返回Bus的名称标识
     */
    String getBusName();
}
    

Bus接口中定义了注册topic的方法和Event发送的方法,具体如下。

  • register(Object subscriber):将某个对象实例注册给Event Bus

  • unregister(Object subscriber):取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除

  • post(Object event):提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic

  • post(Object Event,String topic):提交Event的同时指定了topic

  • close():销毁该Event Bus

  • getBusName():返回该Event Bus的名称


自定义注解 @Subscribe

注册对象给Event Bus的时候需要指定接收消息时的回调方法,采用注解的方式进行Event回调

package com.artisan.busevent.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
    String topic() default "default-topic";
}

@Subscribe要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic(default-topic


同步EventBus

同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式.

package com.artisan.busevent.impl;

import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventExceptionHandler;

import java.util.concurrent.Executor;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class EventBus implements Bus {

    /**
     * 用于维护Subscriber的注册表
     */
    private final Registry registry = new Registry();

    /**
     * Event Bus的名字
     */
    private String busName;

    /**
     * 默认的Event Bus的名字
     */
    private final static String DEFAULT_BUS_NAME = "default";

    /**
     * 默认的topic的名字
     */
    private final static String DEFAULT_TOPIC = "default-topic";

    /**
     * 用于分发广播消息到各个Subscriber的类
     */
    private final Dispatcher dispatcher;


    public EventBus() {
        this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    /**
     * @param busName
     */
    public EventBus(String busName) {
        this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {
        this.busName = busName;
        this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);
    }

    public EventBus(EventExceptionHandler exceptionHandler) {
        this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }

    /**
     * 将注册Subscriber的动作直接委托给Registry
     *
     * @param subscriber
     */
    @Override
    public void register(Object subscriber) {
        this.registry.bind(subscriber);
    }

    /**
     * 解除注册同样委托给Registry
     *
     * @param subscriber
     */
    @Override
    public void unregister(Object subscriber) {
        this.registry.unbind(subscriber);
    }

    /**
     * 提交Event到默认的topic
     *
     * @param event
     */
    @Override
    public void post(Object event) {
        this.post(event, DEFAULT_TOPIC);
    }

    /**
     * 提交Event到指定的topic,具体的动作是由Dispatcher来完成的
     *
     * @param event
     * @param topic
     */
    @Override
    public void post(Object event, String topic) {
        this.dispatcher.dispatch(this, registry, event, topic);
    }

    /**
     * 关闭销毁Bus
     */
    @Override
    public void close() {
        this.dispatcher.close();
    }

    /**
     * 返回Bus的名称
     *
     * @return
     */
    @Override
    public String getBusName() {
        return this.busName;
    }
}
    
  • EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。

  • registry和unregister都是通过Subscriber注册表来完成的。

  • Event的提交则是由Dispatcher来完成的。

  • Executor是使用JDK中的Executor接口,自定义的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。


异步EventBus

如果想要使用异步的方式进行推送,可使用EventBus的子类AsyncEventBus 。

异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可

package com.artisan.busevent.impl;

import com.artisan.busevent.intf.EventExceptionHandler;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 小工匠
 * @version 1.0 
 * @mark: show me the code , change the world
 */
public class AsyncEventBus extends EventBus {

    AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
        super(busName, exceptionHandler, executor);
    }

    public AsyncEventBus(String busName, ThreadPoolExecutor executor) {
        this(busName, null, executor);
    }

    public AsyncEventBus(ThreadPoolExecutor executor) {
        this("default-async", null, executor);
    }

    public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
        this("default-async", exceptionHandler, executor);
    }
}
    

重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor 。


Subscriber注册表Registry

注册表维护了topic和subscriber之间的关系,当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口

package com.artisan.busevent.impl;

import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.relations.Subscriber;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;


/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
class Registry {

    /**
     * 存储Subscriber集合和topic之间关系的map
     */
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();

    public void bind(Object subscriber) {
        //获取Subscriber Object的方法集合然后进行绑定
        List<Method> subscribeMethods = getSubscribeMethods(subscriber);
        subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
    }

    public void unbind(Object subscriber) {
        //unbind为了提高速度,只对Subscriber进行失效操作
        subscriberContainer.forEach((key, queue) ->
                queue.forEach(s ->
                {
                    if (s.getSubscribeObject() == subscriber) {
                        s.setDisable(true);
                    }
                }));
    }

    public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {
        return subscriberContainer.get(topic);
    }

    private void tierSubscriber(Object subscriber, Method method) {
        final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
        String topic = subscribe.topic();
        //当某topic没有Subscriber Queue的时候创建一个
        subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());

        //创建一个Subscriber并且加入Subscriber列表中
        subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
    }

    private List<Method> getSubscribeMethods(Object subscriber) {
        final List<Method> methods = new ArrayList<>();
        Class<?> temp = subscriber.getClass();
        //不断获取当前类和父类的所有@Subscribe方法
        while (temp != null) {
            //获取所有的方法
            Method[] declaredMethods = temp.getDeclaredMethods();
            //只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法
            Arrays.stream(declaredMethods)
                    .filter(m -> m.isAnnotationPresent(Subscribe.class)
                            && m.getParameterCount() == 1
                            && m.getModifiers() == Modifier.PUBLIC)
                    .forEach(methods::add);
            temp = temp.getSuperclass();
        }
        return methods;
    }
}

由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类,所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic),同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息,比如

/**
*非常普通的对象
*/
public class SimpleObject
{
    /**
    *subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数
    */
    @Subscribe(topic = "artisan-topic")
    public void test2(Integer x)
    {

    }
    @Subscribe(topic = "test-topic")
    public void test3(Integer x)
    {
    }
}

SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event


Event广播Dispatcher

Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法.

package com.artisan.busevent.impl;

import com.artisan.busevent.relations.Subscriber;
import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventContext;
import com.artisan.busevent.intf.EventExceptionHandler;

import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class Dispatcher {
    private final Executor executorService;
    private final EventExceptionHandler exceptionHandler;

    public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;

    public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;

    private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {
        this.executorService = executorService;
        this.exceptionHandler = exceptionHandler;
    }

    public void dispatch(Bus bus, Registry registry, Object event, String topic) {
        //根据topic获取所有的Subscriber列表
        ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);
        if (null == subscribers) {
            if (exceptionHandler != null) {
                exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),
                        new BaseEventContext(bus.getBusName(), null, event));
            }
            return;
        }

        //遍历所有的方法,并且通过反射的方式进行方法调用
        subscribers.stream()
                .filter(subscriber -> !subscriber.isDisable())
                .filter(subscriber ->
                {
                    Method subscribeMethod = subscriber.getSubscribeMethod();
                    Class<?> aClass = subscribeMethod.getParameterTypes()[0];
                    return (aClass.isAssignableFrom(event.getClass()));
                }).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
    }

    private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {
        Method subscribeMethod = subscriber.getSubscribeMethod();
        Object subscribeObject = subscriber.getSubscribeObject();
        executorService.execute(() ->
        {
            try {
                subscribeMethod.invoke(subscribeObject, event);
            } catch (Exception e) {
                if (null != exceptionHandler) {
                    exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));
                }
            }
        });
    }

    public void close() {
        if (executorService instanceof ExecutorService) {
            ((ExecutorService) executorService).shutdown();
        }
    }

    static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {
        return new Dispatcher(executor, exceptionHandler);
    }

    static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
        return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
    }

    static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {
        return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
    }

    /**
     * 顺序执行的ExecutorService
     */
    private static class SeqExecutorService implements Executor {

        private final static SeqExecutorService INSTANCE = new SeqExecutorService();

        @Override
        public void execute(Runnable command) {
            command.run();
        }
    }

    /**
     * 每个线程负责一次消息推送
     */
    private static class PreThreadExecutorService implements Executor {
        private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();

        @Override
        public void execute(Runnable command) {
            new Thread(command).start();
        }
    }

    /**
     * 默认的EventContext实现
     */
    private static class BaseEventContext implements EventContext {
        private final String eventBusName;

        private final Subscriber subscriber;

        private final Object event;

        private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {
            this.eventBusName = eventBusName;
            this.subscriber = subscriber;
            this.event = event;
        }

        @Override
        public String getSource() {
            return this.eventBusName;
        }

        @Override
        public Object getSubscriber() {
            return subscriber != null ? subscriber.getSubscribeObject() : null;
        }

        @Override
        public Method getSubscribe() {
            return subscriber != null ? subscriber.getSubscribeMethod() : null;
        }

        @Override
        public Object getEvent() {
            return this.event;
        }
    }
}
    

在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类,其主要是实现了Executor接口和EventContent接口。


测试

简单的Subscriber

 package com.artisan.busevent.consumer;

import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.entity.Artisan;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
@Slf4j
public class SubscriberA {

    /**
     * 消费默认主题的数据 ,接受的数据类型为 String类型
     *
     * @param message
     */
    @SneakyThrows
    @Subscribe
    public void consumerDefaultTopic(String message) {
        log.info("consumerDefaultTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
                Thread.currentThread().getName());
        log.info("SubscriberA-->consumerDefaultTopic(模拟执行5秒)-->收到Message(字符串)--> {}", message);
        TimeUnit.SECONDS.sleep(5);
        log.info("-----------------------consumerDefaultTopic OVER---------------------------------");
    }


    /**
     * 消费 test 的数据 , 接受的数据类型 为 artisan对象
     *
     * @param artisan
     */
    @SneakyThrows
    @Subscribe(topic = "test")
    public void consumerSpecTopic(Artisan artisan) {
        log.info("consumerSpecTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
                Thread.currentThread().getName());
        log.info("SubscriberA-->consumerSpecTopic(模拟执行10秒)-->收到Message-(对象)-> {}", artisan);
        TimeUnit.SECONDS.sleep(10);
        log.info("-----------------------consumerSpecTopic OVER---------------------------------");

    }
}
    
    
 package com.artisan.busevent.consumer;

import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.entity.Artisan;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
@Slf4j
public class SubscriberB {


    /**
     * 消费默认主题的数据
     * @param message
     */
    @SneakyThrows
    @Subscribe
    public void consumerDefaultTopic(String message) {
        log.info("consumerDefaultTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
                Thread.currentThread().getName());
        log.info("SubscriberB-->consumerDefaultTopic(模拟执行3秒)-->收到Message(字符串)--> {}", message);
        TimeUnit.SECONDS.sleep(3);
        log.info("-----------------------consumerDefaultTopic OVER---------------------------------");
    }


    /**
     * 消费 test  主题的数据
     * @param artisan
     */
    @SneakyThrows
    @Subscribe(topic = "test")
    public void consumerSpecTopic(Artisan artisan) {
        log.info("consumerSpecTopic开始执行时间:{}, 执行线程: {}", new SimpleDateFormat("HH:mm:ss").format(new Date()),
                Thread.currentThread().getName());
        log.info("SubscriberB-->consumerSpecTopic(模拟执行5秒)-->收到Message(对象)--> {} ", artisan);
        TimeUnit.SECONDS.sleep(5);
        log.info("-----------------------consumerSpecTopic OVER---------------------------------");

    }
}
    
package com.artisan.busevent.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */

@Data
@NoArgsConstructor
@AllArgsConstructor()
public class Artisan {

    private String name;
    private Integer age;
    private List<String> hobbies;
}
    

同步Event Bus

package com.artisan.busevent.producer;

import com.artisan.busevent.consumer.SubscriberA;
import com.artisan.busevent.consumer.SubscriberB;
import com.artisan.busevent.entity.Artisan;
import com.artisan.busevent.impl.EventBus;
import com.artisan.busevent.intf.Bus;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
@Slf4j
public class EventBus_SyncTest {


    @Test
    public void testSync() {


        Bus bus = new EventBus("TestBus");
        bus.register(new SubscriberA());
        bus.register(new SubscriberB());

        log.info("--------默认 主题 ----");

        // 默认主题
        bus.post("Hello 小工匠");

        log.info("--------test 主题 ----");
        // 指定topic
        bus.post(new Artisan("artisan", 18, Arrays.asList("JAVA", "AIGC")), "test");

    }
}
    

在这里插入图片描述

异步Event Bus

package com.artisan.busevent.producer;

import com.artisan.busevent.consumer.SubscriberA;
import com.artisan.busevent.consumer.SubscriberB;
import com.artisan.busevent.entity.Artisan;
import com.artisan.busevent.impl.AsyncEventBus;
import com.artisan.busevent.impl.EventBus;
import com.artisan.busevent.intf.Bus;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */

@Slf4j
public class EventBus_AsyncTest {

    @Test
    public void testSync() throws InterruptedException {

        Bus bus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
        bus.register(new SubscriberA());
        bus.register(new SubscriberB());

        log.info("--------默认 主题 ----");

        // 默认主题
        bus.post("Hello 小工匠");

        log.info("--------test 主题 ----");
        // 指定topic
        bus.post(new Artisan("artisan", 18, Arrays.asList("JAVA", "AIGC")), "test");



        TimeUnit.SECONDS.sleep(20);
    }
}
    

在这里插入图片描述

小结

EventBus有点类似于GOF设计模式中的监听者模式,但是EventBus提供的功能更加强大,使用起来也更加灵活,EventBus中的Subscriber不需要继承任何类或者实现任何接口,在使用EventBus时只需要持有Bus的引用即可。

在EventBus的设计中有三个非常重要的角色(Bus、Registry和Dispatcher),

  • Bus主要提供给外部使用的操作方法,
  • Registry注册表用来整理记录所有注册在EventBus上的Subscriber,
  • Dispatcher主要负责对Subscriber消息进行推送(用反射的方式执行方法),但是考虑到程序的灵活性,Dispatcher方法中又提供了Executor的多态方式。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735459.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PillarNext论文解读

这篇文章是轻舟智航23年的一篇论文&#xff0c;是对pillarNet进行改进。 改进方面&#xff1a; 1.训练更长的时间在检测头增加IOU预测score&#xff0c;这个iou分数预测不太清楚&#xff0c;不知道是不是iouloss 2.扩大感受野&#xff0c;包括Neck部分使用FPN或者BiFPN.使用…

3.zabbix操作二

文章目录 zabbix操作二部署zabbix代理服务器安装zabbix_proxy安装数据库配置代理服务器配置文件web端添加agent代理并连接主机 部署zabbix高可用群集zabbix监控Windows系统zabbix监控java应用zabbix监控SNMP zabbix操作二 部署zabbix代理服务器 分布式监控的作用&#xff1a;…

Flink web UI配置账号密码,权限控制

由于Flink自带的web UI界面没有账号密码&#xff0c;需要通过nginx实现该效果。 1.安装httpd-tools工具 yum install httpd-tools -y 2.生成用户名密码文件 htpasswd -c /usr/local/nginx/conf/flinkuser username passwd flinkuser&#xff1a;为生成的用户名密码文件名称 …

Apache Doris (二十一) :Doris Rollup物化索引创建与操作

目录 1. 创建测试表 2. 创建Rollup物化索引表 3. 查看Rollup物化索引表 4. 删除Rollup物化索引表 5. 验证Rollup物化索引使用 进入正文之前&#xff0c;欢迎订阅专题、对博文点赞、评论、收藏&#xff0c;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; 宝子们点…

open3d 通过vscode+ssh连接远程服务器将可视化界面本地显示

当使用远程服务器时&#xff0c;我们希望能像在本地一样写完代码后能立刻出现一些gui窗口。但是目前网络上的资料都不能很好的解决这个问题。本文尝试尽可能简短地解决这个问题。 步骤 1、在服务器上安装open3d 已经非常简化了&#xff0c;可以使用一行代码完成 pip3 insta…

【Java从入门到大牛】方法详解

&#x1f525; 本文由 程序喵正在路上 原创&#xff0c;CSDN首发&#xff01; &#x1f496; 系列专栏&#xff1a;Java从入门到大牛 &#x1f320; 首发时间&#xff1a;2023年7月9日 &#x1f98b; 欢迎关注&#x1f5b1;点赞&#x1f44d;收藏&#x1f31f;留言&#x1f43e…

【计算机组成与体系结构Ⅰ】实验7 IP核的使用、D触发器

一、实验目的 1&#xff1a;学会设计用IP核和原理图的方式设计电路&#xff0c;完成涉及1位数据的2选1多路选择器。 2&#xff1a;设计带异步置零和写使能端的D触发器。 二、实验环境 软件&#xff1a;Vivado 2015.4操作系统&#xff1a;Windows 10 三、实验内容 2.2.1 多路…

49天精通Java,第38天,类加载器,双亲委派机制

目录 一、类加载器子系统的作用1、加载2、链接3、初始化 二、验证【虚拟机必须保证一个类的<clinit>()方法在多线程下被同步加锁】的代码实例三、类加载器的分类1、启动类加载器&#xff08;引导类加载器&#xff09;2、扩展类加载器3、应用程序类加载器&#xff08;系统…

字典dict的get和setdefault,以及collections的defaultdict

一&#xff1a;dict的get和setdefault 首先&#xff0c;字典dict的get和setdefault的用法都是xxx(key, value)&#xff0c; 都是在字典中查找指定的键并返回值&#xff0c;当查找的key键在字典中存在时&#xff0c;两者作用相同。 参考&#xff1a;https://blog.csdn.net/any1…

「深度学习之优化算法」(十)烟花算法

1. 烟花算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读)   烟花算法(Firework Algorithm,FWA)是一种受烟花爆炸产生火星,并继续分裂爆炸这一过程启发而得出的算法。算法的思想简单,但具体实现复杂。算法提出时间并不长,但是已经有了不少的改进研究和较为全…

【动手学习深度学习--逐行代码解析合集】11实战Kaggle比赛:预测房价

【动手学习深度学习】逐行代码解析合集 11实战Kaggle比赛&#xff1a;预测房价 视频链接&#xff1a;动手学习深度学习–实战Kaggle比赛&#xff1a;预测房价 课程主页&#xff1a;https://courses.d2l.ai/zh-v2/ 教材&#xff1a;https://zh-v2.d2l.ai/ 1、下载和缓存数据集 …

【JavaEE初阶】JavaScript(WebAPI)

文章目录 1.WebAPI背景知识1.1什么是WebAPI1.2什么是API 2.DOM基本概念2.1什么是DOM2.2常用的DOMAPI2.2.1.选中页面元素2.2.2操作元素的属性1. 事件概念2.获取/修改元素内容3. 获取/修改元素属性4.获取/修改表单元素属性5.获取修改样式属性 2.2.3.操作页面节点1.新增节点2.删除…

关联数组不是线性表

百度百科给的关联数组的解释是&#xff1a; “关联数组”是一种具有特殊索引方式的数组。不仅可以通过整数来索引它&#xff0c;还可以使用字符串或者其他类型的值&#xff08;除了NULL&#xff09;来索引它。 关联数组类似于哈希表&#xff0c;有键-索引&#xff0c;它包含标量…

nunittest如何生成测试报告?我来告诉你

目录 HTMLTestRunner 小试牛刀 1、在unittest中编写测试用例 2、添加报告路径已经报告内容 3、批量执行用例&#xff0c;导入测试报告内容中 4、当然是赶快执行查看报告内容啊 4、添加用例注释&#xff0c;增加报告完整性 总结&#xff1a; 我们做测试的人员们都知道测…

【mysql】—— 数据库基础

序言&#xff1a; 在上期&#xff0c;我们已经安装好了【mysql】。在本期&#xff0c;我将给大家介绍关于数据库的基本知识。 目录 &#xff08;一&#xff09;登陆选项 &#xff08;二&#xff09;基本介绍 1、什么是数据库 2、主流数据库 3、见一见数据库 4、服务器管…

使用matplotlib画图时,建立双坐标轴

在进行数据的可视化分析时&#xff0c;可能我们分析的两个指标&#xff0c;他们的取值区间相差很大&#xff0c;如果采用一个y轴&#xff0c;就不利于我们观察另一个指标。所以&#xff0c;记录一下我在项目的处理过程中采用的方法&#xff1a; 首先观察一下&#xff0c;我用到…

MAYA粒子碰撞颜色collisionU,collisionV

if (nParticleShape1.collisionU >0) { float $UnParticleShape1.collisionU; float $VnParticleShape1.collisionV; vector $colRGB colorAtPoint -o RGB -u $U -v $V ramp2; nParticleShape1.rgbPP $colRGB; } 获取 UV 向量处的颜色 vector $colRGB colorAtPoin…

【C51 介绍发光二极管LED】

51单片机项目基础篇 前篇&#xff1a;介绍发光二极管LED1、认识发光二极管LED1.1、二极管1.2、二极管的特性 2、LED简介3、结束语 前篇&#xff1a;介绍发光二极管LED 前言&#xff1a; (1).我们已经认识了 51 单片机芯片和 51 单片机最小系统。 (2).现在进入一些基础的实验阶…

23.07.09

完善对话框功能 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent) :QWidget(parent),ui(new Ui::Widget) {ui->setupUi(this);// this->setWindowOpacity(0); } Widget::~Widget() {delete ui; } //字体对应的槽 voi…