Apache SeaTunnel Zeta 引擎源码解析(一)Server端的初始化

news2025/1/12 3:56:56

引入

本系列文章是基于 Apache SeaTunnel 2.3.6版本,围绕Zeta引擎给大家介绍其任务是如何从提交到运行的全流程,希望通过这篇文档,对刚刚上手SeaTunnel的朋友提供一些帮助。

file

我们整体的文章将会分成三篇,从以下方向给大家介绍:

  1. SeaTunnel Server端的初始化
  2. Client端的任务提交流程
  3. Server端的接收到任务的执行流程

由于涉及源码解析,涉及篇幅较大,所以分成系列文章来记录下一个任务的整体流程。

参考

  • [ST-Engine][Design] The Design of LogicalPlan to PhysicalPlan:https://github.com/apache/seatunnel/issues/2269

作者介绍

大家好,我是刘乃杰,一名大数据开发工程师,参与Apache SeaTunnel的开发也有一年多的时间了,不仅给SeaTunnel提交了一些PR,而且添加的一些功能也非常有意思,欢迎大家来找我交流,其中包括支持Avro格式文件,SQL Transform中支持嵌套结构查询,给节点添加Tag达到资源隔离等。

近期推送SeaTunnel在公司内部的落地,需要跟同事,老板介绍SeaTunnel的技术架构,也需要详细的运行流程,帮助同事更好的上手开发,维护。

但是发现目前好像没有这样的一篇文章,能从整体来分析一个任务的执行流程,从而帮助开发者更加容易的定位问题,添加功能。

所以花了一些时间来写了这篇文章,希望抛砖引玉让其他的大佬们也多多写一些源码解析的文章出来。

集群拓扑

首先请大家先从整体了解下SeaTunnel的Zeta引擎架构, SeaTunnel是基于hazelcast来实现的分布式集群通信。

在2.3.6版本之后, 集群中的节点可以被分配为Master或Worker节点,从而将调度与执行分开,避免Master节点的负载过高从而出现问题。

并且2.3.6版本还添加了一个功能,可以对每个节点添加tag属性,当提交任务时可以通过tag来选择任务将要运行的节点, 从而达到资源隔离的目的。

file

集群的服务端分为Master或Worker节点, Master节点负责接收请求、逻辑计划生成、分配任务等(与之前的版本相比,会多了几个Backup节点,但是对于集群稳定性来说是一个挺大的提升)。

而Worker节点则只负责执行任务, 也就是数据的读取和写入。

提交任务时可以创建Hazelcast的客户端连接集群来进行通信,或者使用Restapi来进行通信。

服务端启动

当我们对集群的整体架构有个大致的了解后,再来具体了解下具体的流程。

首先看下Server端的启动过程。

Server端的启动命令为:


sh bin/seatunnel-cluster.sh -d -r <node role type>

查看这个脚本的内容后就会发现, 这个脚本最终的执行命令为:

java -cp seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer <other_java_jvm_config_and_args>

我们查看这个starter.seatunnel.SeaTunnelServer的代码

public class SeaTunnelServer {
    public static void main(String[] args) throws CommandException {
        ServerCommandArgs serverCommandArgs =
                CommandLineUtils.parse(
                        args,
                        new ServerCommandArgs(),
                        EngineType.SEATUNNEL.getStarterShellName(),
                        true);
        SeaTunnel.run(serverCommandArgs.buildCommand());
    }
}

这个代码是使用了JCommander来解析用户传递的参数并构建并运行CommandserverCommandArgs.buildCommand返回的类为:

public class ServerExecuteCommand implements Command<ServerCommandArgs> {

    private final ServerCommandArgs serverCommandArgs;

    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
        this.serverCommandArgs = serverCommandArgs;
    }

    @Override
    public void execute() {
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        String clusterRole = this.serverCommandArgs.getClusterRole();
        if (StringUtils.isNotBlank(clusterRole)) {
            if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
            } else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);

                // in hazelcast lite node will not store IMap data.
                seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
            } else {
                throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
            }
        } else {
            seaTunnelConfig
                    .getEngineConfig()
                    .setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
        }

        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }
}

在这里会根据配置的角色类型来修改配置信息。

当是Worker节点时,将Hazelcast节点的类型设置为lite member,在Hazelcast中lite member是不进行数据存储的

然后会创建了一个hazelcast实例, 并且传递了SeaTunnelNodeContext实例以及读取并修改的配置信息

public class SeaTunnelNodeContext extends DefaultNodeContext {

    private final SeaTunnelConfig seaTunnelConfig;

    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.seaTunnelConfig = seaTunnelConfig;
    }

    @Override
    public NodeExtension createNodeExtension(@NonNull Node node) {
        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
    }

    @Override
    public Joiner createJoiner(Node node) {
        JoinConfig join =
                getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
        join.verify();

        if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) {
            super.createJoiner(node);
        } else if (join.getTcpIpConfig().isEnabled()) {
            log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
            return new LiteNodeDropOutTcpIpJoiner(node);
        } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
                || isAnyAliasedConfigEnabled(join)
                || join.isAutoDetectionEnabled()) {
            super.createJoiner(node);
        }
        return null;
    }

    private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
        return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
    }

    private boolean usePublicAddress(JoinConfig join, Node node) {
        return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
                || allUsePublicAddress(
                        AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
    }
}

SeaTunnelNodeContext中覆盖了createNodeExtension方法, 将使用engine.server.NodeExtension类,

这个类的代码为:

public class NodeExtension extends DefaultNodeExtension {
    private final NodeExtensionCommon extCommon;

    public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
        super(node);
        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
    }

    @Override
    public void beforeStart() {
        // TODO Get Config from Node here
        super.beforeStart();
    }

    @Override
    public void afterStart() {
        super.afterStart();
        extCommon.afterStart();
    }

    @Override
    public void beforeClusterStateChange(
            ClusterState currState, ClusterState requestedState, boolean isTransient) {
        super.beforeClusterStateChange(currState, requestedState, isTransient);
        extCommon.beforeClusterStateChange(requestedState);
    }

    @Override
    public void onClusterStateChange(ClusterState newState, boolean isTransient) {
        super.onClusterStateChange(newState, isTransient);
        extCommon.onClusterStateChange(newState);
    }

    @Override
    public Map<String, Object> createExtensionServices() {
        return extCommon.createExtensionServices();
    }

    @Override
    public TextCommandService createTextCommandService() {
        return new TextCommandServiceImpl(node) {
            {
                register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));
                register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));
                register(HTTP_GET, new RestHttpGetCommandProcessor(this));
                register(HTTP_POST, new RestHttpPostCommandProcessor(this));
            }
        };
    }

    @Override
    public void printNodeInfo() {
        extCommon.printNodeInfo(systemLogger);
    }
}

在这个代码中, 我们可以看到在构造方法中, 初始化了SeaTunnelServer这个类, 而这个类与最开始的类是同名的,

是在不同的包下, 这个类的完整类名为: org.apache.seatunnel.engine.server.SeaTunnelServer

我们看下这个类的代码:

public class SeaTunnelServer
        implements ManagedService, MembershipAwareService, LiveOperationsTracker {

    private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);

    public static final String SERVICE_NAME = "st:impl:seaTunnelServer";

    private NodeEngineImpl nodeEngine;
    private final LiveOperationRegistry liveOperationRegistry;

    private volatile SlotService slotService;
    private TaskExecutionService taskExecutionService;
    private ClassLoaderService classLoaderService;
    private CoordinatorService coordinatorService;
    private ScheduledExecutorService monitorService;

    @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;

    private final SeaTunnelConfig seaTunnelConfig;

    private volatile boolean isRunning = true;

    public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.liveOperationRegistry = new LiveOperationRegistry();
        this.seaTunnelConfig = seaTunnelConfig;
        LOGGER.info("SeaTunnel server start...");
    }



    @Override
    public void init(NodeEngine engine, Properties hzProperties) {
         ...
        if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
                == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
            startWorker();
            startMaster();

        } else if (EngineConfig.ClusterRole.WORKER.ordinal()
                == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
            startWorker();
        } else {
            startMaster();
        }
        ...
    }

    ....
}

这个类是SeaTunnel Server端的核心代码, 在这个类中会根据节点的角色来启动相关的组件。

稍微总结下seatunnel的流程:

SeaTunnel是借助于Hazelcast的基础能力,来实现集群端的组网, 并调用启动核心的代码。

对于这一块有想深入了解的朋友可以去看下Hazelcast的相关内容,这里仅仅列出了调用路径。

按照顺序所加载调用的类为

  1. starter.SeaTunnelServer
  2. ServerExecutreCommand
  3. SeaTunnelNodeContext
  4. NodeExtension
  5. server.SeaTunnelServer

file

接下来再来详细看下Master节点以及Worker节点中所创建的组件

Worker节点

private void startWorker() {
    taskExecutionService =
            new TaskExecutionService(
                    classLoaderService, nodeEngine, nodeEngine.getProperties());
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
}

public SlotService getSlotService() {
    if (slotService == null) {
        synchronized (this) {
            if (slotService == null) {
                SlotService service =
                        new DefaultSlotService(
                                nodeEngine,
                                taskExecutionService,
                                seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                service.init();
                slotService = service;
            }
        }
    }
    return slotService;
}

我们可以看到在startWorker方法中, 会初始化两个组件, taskExectutionServiceslotService 这两个组件, 都与任务执行相关。

SlotService

先来看下SlotService的初始化

@Override
public void init() {
    initStatus = true;
    slotServiceSequence = UUID.randomUUID().toString();
    contexts = new ConcurrentHashMap<>();
    assignedSlots = new ConcurrentHashMap<>();
    unassignedSlots = new ConcurrentHashMap<>();
    unassignedResource = new AtomicReference<>(new ResourceProfile());
    assignedResource = new AtomicReference<>(new ResourceProfile());
    scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(
                    r ->
                            new Thread(
                                    r,
                                    String.format(
                                            "hz.%s.seaTunnel.slotService.thread",
                                            nodeEngine.getHazelcastInstance().getName())));
    if (!config.isDynamicSlot()) {
        initFixedSlots();
    }
    unassignedResource.set(getNodeResource());
    scheduledExecutorService.scheduleAtFixedRate(
            () -> {
                try {
                    LOGGER.fine(
                            "start send heartbeat to resource manager, this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                    sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
                } catch (Exception e) {
                    LOGGER.warning(
                            "failed send heartbeat to resource manager, will retry later. this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                }
            },
            0,
            DEFAULT_HEARTBEAT_TIMEOUT,
            TimeUnit.MILLISECONDS);
}

在SeaTunnel中会有一个动态Slot的概念,如果设置为true, 则每个节点就没有Slot的这样一个概念,可以提交任意数量的任务到此节点上,如果设置为固定数量的Slot, 那么该节点仅能接受这些Slot数量的Task运行。

在初始化时,会根据是否为动态Slot来进行数量的初始化。

private void initFixedSlots() {
    long maxMemory = Runtime.getRuntime().maxMemory();
    for (int i = 0; i < config.getSlotNum(); i++) {
        unassignedSlots.put(
                i,
                new SlotProfile(
                        nodeEngine.getThisAddress(),
                        i,
                        new ResourceProfile(
                                CPU.of(0), Memory.of(maxMemory / config.getSlotNum())),
                        slotServiceSequence));
    }
}

同时也可以看到初始化时会启动一个线程,定时向Master节点发送心跳,心跳信息中则包含了当前节点的信息, 包括已经分配的、未分配的Slot数量等属性,Worker节点通过心跳将信息定时更新给Master。

@Override
public synchronized WorkerProfile getWorkerProfile() {
    WorkerProfile workerProfile = new WorkerProfile(nodeEngine.getThisAddress());
    workerProfile.setProfile(getNodeResource());
    workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0]));
    workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0]));
    workerProfile.setUnassignedResource(unassignedResource.get());
    workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
    workerProfile.setDynamicSlot(config.isDynamicSlot());
    return workerProfile;
}

private ResourceProfile getNodeResource() {
    return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
}

TaskExecutionService

这个组件与任务提交相关, 这里先简单看下,与任务提交的相关代码在后续再深入查看。

在Worker节点初始化时, 会新建一个TaskExecutionService对象,并调用其start方法

private final ExecutorService executorService =
        newCachedThreadPool(new BlockingTaskThreadFactory());

public TaskExecutionService(
        ClassLoaderService classLoaderService,
        NodeEngineImpl nodeEngine,
        HazelcastProperties properties) {
        // 加载配置信息
    seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
    this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
    this.nodeEngine = nodeEngine;
    this.classLoaderService = classLoaderService;
    this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
    // 指标相关
    MetricsRegistry registry = nodeEngine.getMetricsRegistry();
    MetricDescriptor descriptor =
            registry.newMetricDescriptor()
                    .withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
    registry.registerStaticMetrics(descriptor, this);
    // 定时任务更新指标到IMAP中
    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(
            this::updateMetricsContextInImap,
            0,
            seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
            TimeUnit.SECONDS);

    serverConnectorPackageClient =
            new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);

    eventBuffer = new ArrayBlockingQueue<>(2048);
    // 事件转发服务
    eventForwardService =
            Executors.newSingleThreadExecutor(
                    new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
    eventForwardService.submit(
            () -> {
                List<Event> events = new ArrayList<>();
                RetryUtils.RetryMaterial retryMaterial =
                        new RetryUtils.RetryMaterial(2, true, e -> true);
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        events.clear();

                        Event first = eventBuffer.take();
                        events.add(first);

                        eventBuffer.drainTo(events, 500);
                        JobEventReportOperation operation = new JobEventReportOperation(events);

                        RetryUtils.retryWithException(
                                () ->
                                        NodeEngineUtil.sendOperationToMasterNode(
                                                        nodeEngine, operation)
                                                .join(),
                                retryMaterial);

                        logger.fine("Event forward success, events " + events.size());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.info("Event forward thread interrupted");
                    } catch (Throwable t) {
                        logger.warning(
                                "Event forward failed, discard events " + events.size(), t);
                    }
                }
            });
}

public void start() {
    runBusWorkSupplier.runNewBusWork(false);
}

在这个类中,有一个成员变量,创建了一个线程池。

在构造方法中创建了一个定时任务来更新IMAP里面的任务状态。创建了一个任务来将Event信息发送给Master节点,由Master节点将这些Event发送给外部服务。

file

Master节点

private void startMaster() {
    coordinatorService =
            new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);
}

我们可以看到在Master节点中,启动了两个组件:协调器组件和监控组件。

监控组件的任务比较简单, 就是周期性的打印集群信息。

CoordinatorService

public CoordinatorService(
        @NonNull NodeEngineImpl nodeEngine,
        @NonNull SeaTunnelServer seaTunnelServer,
        EngineConfig engineConfig) {
    this.nodeEngine = nodeEngine;
    this.logger = nodeEngine.getLogger(getClass());
    this.executorService =
            Executors.newCachedThreadPool(
                    new ThreadFactoryBuilder()
                            .setNameFormat("seatunnel-coordinator-service-%d")
                            .build());
    this.seaTunnelServer = seaTunnelServer;
    this.engineConfig = engineConfig;
    masterActiveListener = Executors.newSingleThreadScheduledExecutor();
    masterActiveListener.scheduleAtFixedRate(
            this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
}


private void checkNewActiveMaster() {
    try {
        if (!isActive && this.seaTunnelServer.isMasterNode()) {
            logger.info(
                    "This node become a new active master node, begin init coordinator service");
            if (this.executorService.isShutdown()) {
                this.executorService =
                        Executors.newCachedThreadPool(
                                new ThreadFactoryBuilder()
                                        .setNameFormat("seatunnel-coordinator-service-%d")
                                        .build());
            }
            initCoordinatorService();
            isActive = true;
        } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
            isActive = false;
            logger.info(
                    "This node become leave active master node, begin clear coordinator service");
            clearCoordinatorService();
        }
    } catch (Exception e) {
        isActive = false;
        logger.severe(ExceptionUtils.getMessage(e));
        throw new SeaTunnelEngineException("check new active master error, stop loop", e);
    }
}

在初始化时, 会启动一个线程来定时检查当前阶段是否为Master节点, 当节点当前不是Master节点但在集群中成为Master节点时, 会调用initCoordinatorService()来进行状态的初始化, 并将状态修改为True。

当节点自身标记为Master节点,但在集群中已不再是Master节点时,进行状态清理。

private void initCoordinatorService() {
    // 从hazelcast中获取分布式IMAP
    runningJobInfoIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
    runningJobStateIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
    runningJobStateTimestampsIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
    ownedSlotProfilesIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
    metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
    // 初始化JobHistoryService
    jobHistoryService =
            new JobHistoryService(
                    runningJobStateIMap,
                    logger,
                    runningJobMasterMap,
                    nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
                    nodeEngine
                            .getHazelcastInstance()
                            .getMap(Constant.IMAP_FINISHED_JOB_METRICS),
                    nodeEngine
                            .getHazelcastInstance()
                            .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
                    engineConfig.getHistoryJobExpireMinutes());
    // 初始化EventProcess, 用于发送事件到其他服务
    eventProcessor =
            createJobEventProcessor(
                    engineConfig.getEventReportHttpApi(),
                    engineConfig.getEventReportHttpHeaders(),
                    nodeEngine);

    // If the user has configured the connector package service, create it  on the master node.
    ConnectorJarStorageConfig connectorJarStorageConfig =
            engineConfig.getConnectorJarStorageConfig();
    if (connectorJarStorageConfig.getEnable()) {
        connectorPackageService = new ConnectorPackageService(seaTunnelServer);
    }
    // 集群恢复后, 尝试恢复之前的历史任务
    restoreAllJobFromMasterNodeSwitchFuture =
            new PassiveCompletableFuture(
                    CompletableFuture.runAsync(
                            this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
}

Coordinatorservice中,会拉取分布式MAP,这个数据结构是Hazelcast的一个数据结构,可以认为是在集群中数据一致的一个MAP。

在SeaTunnel中, 使用这个结构来存储任务信息、Slot信息等。

在这里还会创建EventProcessor, 这个类是用来将事件通知到其他服务,比如任务失败,可以发送信息到配置的接口中,实现事件推送。

最后,由于节点启动,可能是集群异常重启,或者节点切换,这时需要恢复历史运行的任务,那么就会从刚刚获取到的IMAP中获取到之前正在跑的任务列表,然后尝试进行恢复。

这里的IMAP信息可以开启持久化将信息存储到HDFS等文件系统中, 这样可以在系统完全重启后仍然能够读取到之前的任务状态并进行恢复。

CoordinatorService中运行的组件有:

  • executorService (所有可能被选举为master的节点)
  • jobHistoryService (master节点)
  • eventProcessor (master节点) file

Master节点与备选节点上会:

  1. 定时检查自己是否为Master节点, 如果是则进行相应的状态转化

Master节点上会:

  1. 定时打印集群的状态信息。
  2. 启动转发服务, 将要推送的事件转发到外边服务

在Worker节点上, 启动后会:

  1. 定时将状态信息上报到Master节点
  2. 将任务信息更新到IMAP里面。
  3. 将在Worker产生的要推送给外部服务的事件转发到Master节点上。

至此, 服务端所有服务组件都已启动完成,本文完!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2094057.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

掌握数据利器:AWS Glue与数据基盘概览

引言 随着数字化进程的不断推进&#xff0c;企业现在能够积累并分析海量且多样化的数据。这一优势使得许多企业开始采用数据驱动型经营&#xff08;即基于数据的经营策略&#xff09;。通过基于数据的客观判断&#xff0c;企业及其管理者可以获得诸多好处。 然而&#xff0c;…

DeepMind 机器人学习打乒乓球,朝着「专业运动员水平的速度和性能」发展

这几天全球各界最火热的话题非奥运会莫属&#xff0c;而其中乒乓球比赛更是引起了互联网的讨论热潮&#xff0c;无论是欢呼也好、争议也罢&#xff0c;在现实世界人类的乒乓球大赛风生水起的同时&#xff0c;AI已经偷偷在乒乓球上“出师”了—— ——DeepMind近日发布一项新工作…

机器学习 第7章 贝叶斯分类器

目录 7.1 贝叶斯决策论7.2 极大似然估计7.3 朴素贝叶斯分类器7.4 半朴素贝叶斯分类器7.5 贝叶斯网7.5.1 结构7.5.2 学习7.5.3 推断 7.6 EM算法 7.1 贝叶斯决策论 对分类任务来说&#xff0c;在所有相关概率都己知的理想情形下&#xff0c;贝叶斯决策论考虑如何基于这些概率和误…

如何删除浏览器每次登录自动保存的密码,以防自动登录泄露自己的隐私

今天小编以 Microsoft edge 浏览器为例&#xff0c;如何在自己离职或毕业以后留给他人的电脑是干净的&#xff0c;不会在任何网页登录时显示已保存的密码&#xff0c;让他人自动登录。 ①在电脑上打开 Microsoft edge 浏览器后&#xff0c;点击“设置” ②进入设置界面后&…

基于SSM的咖啡馆管理系统

基于SSM的咖啡馆管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatisJSP工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 前台界面 后台界面 摘要 在当前这个信息爆炸的时代&#xff0c;众多行业正经历着…

Python酷库之旅-第三方库Pandas(114)

目录 一、用法精讲 501、pandas.DataFrame.mode方法 501-1、语法 501-2、参数 501-3、功能 501-4、返回值 501-5、说明 501-6、用法 501-6-1、数据准备 501-6-2、代码示例 501-6-3、结果输出 502、pandas.DataFrame.pct_change方法 502-1、语法 502-2、参数 502…

[知识分享]华为铁三角工作法

在通信技术领域&#xff0c;尤其是无线通信和物联网领域&#xff0c;“华为铁三角”是华为公司内部的一种销售、交付和服务一体化的运作模式。这种模式强调的是以客户为中心&#xff0c;通过市场、销售、交付和服务三个关键环节的紧密协作&#xff0c;快速响应客户需求&#xf…

2.12 滑动条事件

目录 实验原理 实验代码 运行结果 实验原理 在 OpenCV 中&#xff0c;滑动条设计的主要目的是在视频播放帧中选择特定帧&#xff0c;而在调节图像参数时也会经常用到。在使用滑动条前&#xff0c;需要给滑动条赋予一个名字&#xff08;通常是一个字符串&#xff09;&#x…

Java | Leetcode Java题解之第388题文件的最长绝对路径

题目&#xff1a; 题解&#xff1a; class Solution {public int lengthLongestPath(String input) {int n input.length();int pos 0;int ans 0;int[] level new int[n 1];while (pos < n) {/* 检测当前文件的深度 */int depth 1;while (pos < n && inpu…

Mamba:超越Transformer的新一代神经网络架构

在过去的七年里&#xff0c;Transformer一直在语言建模领域占据着主导地位。然而&#xff0c;现在有一个新兴的神经网络架构Mamba&#xff0c;正在挑战Transformer的霸主地位。虽然目前Mamba仅在规模较小的模型上进行了测试&#xff08;参数量达到数十亿&#xff09;&#xff0…

华为OD机试真题 - 构成正方形的数量(Java/Python/JS/C/C++ 2024 B卷 100分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;E卷D卷A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加…

MySQL密码策略更改(临时+永久)

目录 1、查看数据库当前密码策略 2、查看密码插件&#xff1a; 3、官方文档策略定义 4、更改密码策略 临时修改 &#xff08;1&#xff09;更改密码策略为LOW&#xff0c;改为LOW或0 &#xff08;2&#xff09;更改密码长度 &#xff08;3&#xff09;设置大小写、数字…

【操作系统】操作系统运行环境——中断与异常

中断与异常 导读一、中断机制1.1 中断机制的重要性 二、中断与异常的基本概念2.1 中断与异常的个人理解2.2 内中断与外中断 三、中断与异常的分类四、中断与异常的处理过程结语 导读 大家好&#xff0c;很高兴又和大家见面啦&#xff01;&#xff01;&#xff01; 在上一篇内…

【C++ | 设计模式】简单工厂模式的详解与实现

1.简单工厂模式概述 简单工厂模式&#xff08;Simple Factory Pattern&#xff09;是一种创建型设计模式&#xff0c;它定义了一个工厂类&#xff0c;由这个类根据提供的参数决定创建哪种具体的产品对象。简单工厂模式将对象的创建逻辑集中到一个工厂类中&#xff0c;从而将对…

认知杂谈32

今天分享 有人说的一段争议性的话 I I 《恋爱中的价值难题》 咱就认识个31岁的哥们&#xff0c;事业有成&#xff0c;一年能挣35 万。他现在正为找对象的事儿犯愁呢。他想找个年轻漂亮的小姑娘谈对象&#xff0c;可又不想在感情上投入太多&#xff0c;就想一边乐呵着&#x…

Linux(CentOS)同步服务器时间之~ntpd

NTP 是 Network Time Protocol&#xff08;网络时间协议&#xff09;的缩写&#xff0c;它是一种用于在计算机系统之间同步时间的协议。NTP 允许网络中的设备通过与一个或多个时间服务器进行通信&#xff0c;来校正自身的系统时钟&#xff0c;确保所有设备上的时间保持高度一致…

演示:基于WPF的DrawingVisual和谷歌地图瓦片开发的地图(完全独立不依赖第三方库)

一、目的&#xff1a;基于WPF的DrawingVisual和谷歌地图瓦片开发的地图 二、预览 三、环境 VS2022&#xff0c;Net7,DrawingVisual&#xff0c;谷歌地图瓦片 四、主要功能 地图缩放&#xff0c;平移&#xff0c;定位 真实经纬度 显示瓦片信息 显示真实经纬度和经纬线 省市县…

[环境配置]Pycharm手动安装汉化插件

在Pycharm-file-setting-Plugins中&#xff0c;搜索chinese&#xff0c;就会出现汉化包 点击install后&#xff0c;在安装时出现这种报错&#xff1a;Plugin "Chinese (Simplified) Language Pack / 中文语言包" was not installed: Invalid filename returned by a …

用 jsPDF 让 PDF 生成触手可及

jsPDF &#xff1a;在浏览器中生成 PDF&#xff0c;从未如此简单- 精选真开源&#xff0c;释放新价值。 概览 jsPDF 是一个开源的 JavaScript 库&#xff0c;专为在浏览器端生成 PDF 文档而设计。它通过提供一个直观且易于使用的 API&#xff0c;使得开发者能够快速地将 PDF 生…

【Kubernetes】持久卷 PV

持久卷 PV 1.什么是持久卷2.创建一个持久卷3.持久卷的访问模式4.持久卷的回收策略 数据卷是在创建 Pod 时通过 挂载目录 来实现数据的共享和持久化的。但是在一个大型系统中&#xff0c;这种方式是非常不利于管理的&#xff0c;因为数据卷把数据的 持久存储 和 供应使用 封装在…