你了解Java中的ForkJoin吗?

news2025/3/13 7:35:25

什么是ForkJoin?

ForkJoin 从字面上看Fork是分岔的意思,Join是结合的意思,我们可以理解为将大任务拆分成小任务进行计算求解,最后将小任务的结果进行结合求出大任务的解,这些裂变出来的小任务,我们就可以交给不同的线程去进行计算,这也就是分布式计算的一种思想。这与大数据中的分布式离线计算MapReduce类似,对ForkJoin最经典的一个应用就是Java8中的Stream,我们知道Stream分为串行流和并行流,其中并行流parallelStream就是依赖于ForkJoin来实现并行处理的。

下面我们一起来看一下最为核心的ForkJoinTask和ForkJoinPool。

ForkJoinTask 任务

ForkJoinTask本身的依赖关系并不复杂,它与异步任务计算FutureTask一样均实现了Future接口,FutureTask我们在之前的文章中有讲到感兴趣的可以阅读一下——从源码看异步任务计算FutureTask。

下面我们就ForkJoinTask的核心源码来研究一下,该任务是如何通过分治法进行计算。

ForkJoinTask最核心的莫过于fork()和join()方法了。

  • fork()

    • 判断当前线程是不是ForkJoinWorkerThread线程

      • 是 直接将当前线程push到工作队列中

      • 否 调用ForkJoinPool 的externalPush方法

        在ForkJoinPool构建了一个静态的common对象,这里调用的就是common的externalPush()

  • join()

    • 调用doJoin()方法,等待线程执行完成
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

	// 获取结果的方法由子类实现
	public abstract V getRawResult();	
复制代码

RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    V result;

    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }
    
    protected final boolean exec() {
        result = compute();
        return true;
    }

}
复制代码

ForkJoinPool 线程池

ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从源码分析线程池(池化技术)的实现原理。

ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。

工作窃取算法

ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去“窃取”其它任务队列中的任务,这就是所谓的“工作窃取算法”。

对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足“工作窃取”的需求,任务队列应该支持从“队尾”出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。

构造方法

首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?

  • int parallelism 可并行级别(不代表最多存在的线程数量)
  • ForkJoinWorkerThreadFactory factory 线程创建工厂
  • UncaughtExceptionHandler handler 异常捕获处理器
  • boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

	public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

	public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
复制代码

提交方法

下面我们看一下提交任务的方法

externalPush这个方法我们很眼熟,它正是在fork的时候如果当前线程不是ForkJoinWorkerThread,新提交任务也是会通过这个方法去执行任务。由此可见,fork就是新建一个子任务进行提交。

externalSubmit是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。

signalWork(ws, q)是发送工作信号,让工作队列进行运转。

    public ForkJoinTask<?> submit(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
            job = (ForkJoinTask<?>) task;
        else
            job = new ForkJoinTask.AdaptedRunnableAction(task);
        externalPush(job);
        return job;
    }

    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putOrderedInt(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }

    private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
            }
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                rs = lockRunState();
                try {
                    if ((rs & STARTED) == 0) {
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                        int p = config & SMASK; // ensure at least 2 slots
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                        workQueues = new WorkQueue[n];
                        ns = STARTED;
                    }
                } finally {
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
            else if ((q = ws[k = r & m & SQMASK]) != null) {
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                            submitted = true;
                        }
                    } finally {
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                    if (submitted) {
                        signalWork(ws, q);
                        return;
                    }
                }
                move = true;                   // move on failure
            }
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                q.hint = r;
                q.config = k | SHARED_QUEUE;
                q.scanState = INACTIVE;
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                unlockRunState(rs, rs & ~RSLOCK);
            }
            else
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);
        }
    }
复制代码

创建工人(线程)

提交任务后,通过signalWork(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。

    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
        while ((c = ctl) < 0L) {                       // too few active
            if ((sp = (int)c) == 0) {                  // no idle workers
                if ((c & ADD_WORKER) != 0L)            // too few workers
                    tryAddWorker(c);
                break;
            }
            if (ws == null)                            // unstarted/terminated
                break;
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }

    private void tryAddWorker(long c) {
        boolean add = false;
        do {
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                if (stop != 0)
                    break;
                if (add) {
                    createWorker();
                    break;
                }
            }
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }

    private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
            if (fac != null && (wt = fac.newThread(this)) != null) {
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
        deregisterWorker(wt, ex);
        return false;
    }

   final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
        WorkQueue w = null;
        if (wt != null && (w = wt.workQueue) != null) {
            WorkQueue[] ws;                           // remove index from array
            int idx = w.config & SMASK;
            int rs = lockRunState();
            if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                ws[idx] = null;
            unlockRunState(rs, rs & ~RSLOCK);
        }
        long c;                                       // decrement counts
        do {} while (!U.compareAndSwapLong
                     (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                           (TC_MASK & (c - TC_UNIT)) |
                                           (SP_MASK & c))));
        if (w != null) {
            w.qlock = -1;                             // ensure set
            w.transferStealCount(this);
            w.cancelAll();                            // cancel remaining tasks
        }
        for (;;) {                                    // possibly replace
            WorkQueue[] ws; int m, sp;
            if (tryTerminate(false, false) || w == null || w.array == null ||
                (runState & STOP) != 0 || (ws = workQueues) == null ||
                (m = ws.length - 1) < 0)              // already terminating
                break;
            if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                if (tryRelease(c, ws[sp & m], AC_UNIT))
                    break;
            }
            else if (ex != null && (c & ADD_WORKER) != 0L) {
                tryAddWorker(c);                      // create replacement
                break;
            }
            else                                      // don't need replacement
                break;
        }
        if (ex == null)                               // help clean on way out
            ForkJoinTask.helpExpungeStaleExceptions();
        else                                          // rethrow
            ForkJoinTask.rethrow(ex);
    }

    public static interface ForkJoinWorkerThreadFactory {
        public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    }
    static final class DefaultForkJoinWorkerThreadFactory
        implements ForkJoinWorkerThreadFactory {
        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            return new ForkJoinWorkerThread(pool);
        }
    }
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }

    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
        wt.setDaemon(true);                           // configure thread
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }
复制代码

例:ForkJoinTask实现归并排序

这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。

package com.zhj.interview;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Test16 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int[] bigArr = new int[10000000];
        for (int i = 0; i < 10000000; i++) {
            bigArr[i] = (int) (Math.random() * 10000000);
        }
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        MyForkJoinTask task = new MyForkJoinTask(bigArr);
        long start = System.currentTimeMillis();
        forkJoinPool.submit(task).get();
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end-start));
	}

}
class MyForkJoinTask extends RecursiveTask<int[]> {

    private int source[];

    public MyForkJoinTask(int source[]) {
        if (source == null) {
            throw new RuntimeException("参数有误!!!");
        }
        this.source = source;
    }

    @Override
    protected int[] compute() {
        int l = source.length;
        if (l < 2) {
            return Arrays.copyOf(source, l);
        }
        if (l == 2) {
            if (source[0] > source[1]) {
                int[] tar = new int[2];
                tar[0] = source[1];
                tar[1] = source[0];
                return tar;
            } else {
                return Arrays.copyOf(source, l);
            }
        }
        if (l > 2) {
            int mid = l / 2;
            MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
            task1.fork();
            MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
            task2.fork();
            int[] res1 = task1.join();
            int[] res2 = task2.join();
            int tar[] = merge(res1, res2);
            return tar;
        }
        return null;
    }
	// 合并数组
    private int[] merge(int[] res1, int[] res2) {
        int l1 = res1.length;
        int l2 = res2.length;
        int l = l1 + l2;
        int tar[] = new int[l];
        for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
            int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
            int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
            // 如果条件成立,说明应该取数组array1中的值
            if(v1 < v2) {
                tar[i] = v1;
                i1++;
            } else {
                tar[i] = v2;
                i2++;
            }
        }
        return tar;
    }
}
复制代码

ForkJoin计算流程

通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新智慧杂志新智慧杂志社新智慧编辑部2022年第30期目录

杏坛潮_热点关注 特色劳动教育模式探究——基于“劳动周”项目的设计与实践 王胜; 1-3《新智慧》投稿&#xff1a;cn7kantougao163.com “双减”背景下小学数学大单元深度学习的实践与探索——以小学高年级为例 王晓雪; 4-6 杏坛潮_创新教育 问题教学法在高中物理教…

20221215英语学习

今日新词&#xff1a; rich adj.富有的, 富裕的, 油腻的 luck n.运气, 幸运, 好运, 侥幸 live adj.活的, 现场直播的, 实况转播的, 现场演出的 tutor n.家庭教师&#xff1b;私人教师&#xff1b;导师&#xff1b;助教 swipe v.挥拳打, 扬起巴掌打, 挥起&#xff08;物体&…

刷屏的AI 绘画,你成功驯服了吗?其背后的AIGC模型你可能还不知道

文章目录前言基于 CLIP Latents 的条件文本图像生成BLIPHugging Face奇点智源中文-CLIP百度昆仑万维之AI绘画前言 随着人工智能技术的发展与完善&#xff0c;AIGC&#xff08;AI generated content&#xff09;在内容的创作上为人们的工作和生活带来前所未有的帮助&#xff0c…

SpringCloud(7)— ElasticSearch基础

SpringCloud&#xff08;7&#xff09;— Elasticsearch基础 一 初识Elasticsearch elasticserach是一个强大的开源搜索引擎&#xff0c;可以从海量数据中迅速找到想要的内容。 elasticsearch结合了 Kibana, Logstach, Beats,也就是 elastic stack。主要应用于日志数据分析&…

【信管4.1】范围与需求

范围与需求范围其实说白了就是我们要做的东西都包括哪些内容&#xff0c;这些内容的边界在哪里&#xff0c;范围其实从另一个角度来说的话&#xff0c;也可以看成是一个产品的约束。为什么要有一个约束呢&#xff1f;你见过一个即是电商&#xff0c;又是社交&#xff0c;还能兼…

Python还好就业吗?30多岁转行晚吗?

最近不少人在微信问我现在Python还好就业不好就业&#xff1f;发展前景怎么样&#xff1f;我30多岁了&#xff0c;还能不能转行编程&#xff1f;Python该怎么学&#xff1f;如果做Python到底该做爬虫还是数据分析还是web&#xff1f;…等等这样的问题&#xff0c;现在逐一谈下我…

将市场部与整个组织联系起来,协调每个人的利益,来达成业务目标

让我们来谈谈业务目标&#xff0c;以及这些目标如何将营销部门与整个组织联系起来&#xff0c;并帮助协调每个人的利益。什么是有效的目标&#xff0c;什么是无效的目标?举一两个例子就好了。 当然。我们目前在高管层看到的情况是——甚至在新冠疫情之前我们就已经看到了——首…

【软件测试】你遇到的随机的bug?出现的原因......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 随机性bug? 为什么…

学习过程中遇到的问题总结(持续更新~~~)

问题总结问题1&#xff1a;Access denied for user 1234localhost (using password: YES)问题2&#xff1a;启动tomcat问题&#xff0c;报错Failed to execute goal org.apache.tomcat.maven:tomcat7-maven-plugin:2.2:run (default-cli) on project springMvc_03_request_mapp…

Linux 6.2:华为代码加速核心功能 715 倍!

整理 | 王启隆出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;作为送给全球开发者的圣诞礼物&#xff0c;Linux 在前日发布了 Linux 6.1 内核的稳定版&#xff0c;并开启了 Linux 6.2 的合并窗口。这次更新不仅为广大用户带来了不少新功能与改进&#xff0c;还让许…

竣达技术 | 智能机房动力环境监控主机、多功能监控服务器

专为现代各类计算机及网络通信机房、通信行业基站而设计的远程多功能监控服务器&#xff0c;系统具备3个RS485通讯接口可监控机房环境温湿度、门禁、烟雾、漏水、市电断电等各类传感器检测对应的告警状态&#xff0c;同时支持15个开关量检测。6路继电器输出控制&#xff0c;支持…

MySQL——表数据删了一半了,表文件大小还是不变

本篇文章针对 InnoDB 引擎展开讨论。一个 InnoDB 表包含两部分&#xff0c;即&#xff1a;表结构定义和数据。在 MySQL 8.0 版本以前&#xff0c;表结构是存在以.frm 为后缀的文件里。而 MySQL 8.0 版本&#xff0c;则已经允许把表结构定义放在系统数据表中了。因为表结构定义占…

Windows安装Jenkins

文章目录1.下载Jenkins2.安装Jenkins1.下载Jenkins 进入jenkins官网下载 https://www.jenkins.io/download/ 要下载不同版本的话可以去下面链接看看 https://mirrors.tuna.tsinghua.edu.cn/jenkins/ 2.安装Jenkins 1.点击下载下来的 jenkins.msi 文件 进行安装 2.选择第…

【Python爬虫实战】找工作太难?职场套路太深?来来来小编教你做人啊—这里的老板都跑到街上招人了,月薪1万够不够?

导语 哈喽大家好&#xff01;我是木子吖~ 上一期给大家已经介绍了爬虫的一些功能步骤等等&#xff0c;这一期想着还是给大家更新一些爬虫的案 例吧&#xff01;这里有我给大家准备的精心准备的爬虫案例代码&#xff0c;当然如果基础有点儿差的小伙伴儿也 不用担心哈&#x…

【Spring】——15、使用@PropertySource加载配置文件

&#x1f4eb;作者简介&#xff1a;zhz小白 公众号&#xff1a;小白的Java进阶之路 专业技能&#xff1a; 1、Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理 2、熟悉Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理&#xff0c;具备⼀定的线…

【Java】Java异常Exception和Error有什么区别?

异常处理Exception 和 Error 的区别try-catch代码块总结写程序就需要考虑程序中是否有异常&#xff0c;如果存在异常应该如何处理比较友好。Java 语言在设计之初就提供了相对完善的异常处理机制&#xff0c;这也是 Java 得以大行其道的原因之一&#xff0c;因为这种机制大大降低…

三方线上美食城|基于Springboot的三方线上美食商城系统

作者主页&#xff1a;编程指南针 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容&#xff1a;Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 收藏点赞不迷路 关注作者有好处 文末获取源…

Neo4j 实战(一)-- Mac neo4j 安装与配置

前言 Neo4j是一个高性能的&#xff0c;Nosql图形数据库。Nosql &#xff1d;no sql&#xff0c;即与传统的将数据结构化并存储在表中的数据库不一样。Neo4j将数据存储在网络上&#xff0c;我们也可以把Neo4j视为一个图引擎。我们打交道的是一个面对对象的、灵活的网络结构而不是…

【消息中间件】RocketMQ底层如何实现生产者发送消息

目录 一、前言 二、实现生产者发送消息 1、启动生产者 1.1、RocketMQTemplate消息发送模板 1.2、afterPropertiesSet()逻辑 1.3、DefaultMQProducer#start()逻辑 2、DefaultMQProducer#start()启动逻辑 2.1、更新路由信息到本地 2.2、从本地获取主题Topic信息 2.3、数…

flink on yarn

文章目录flink sql client on yarnsession 模式Per-Job Cluster 模式flink run安装完hadoop 3.3.4之后&#xff0c;启动hadoop、yarn 将flink 1.14.6上传到各个服务器节点&#xff0c;解压 flink sql client on yarn https://nightlies.apache.org/flink/flink-docs-release…