java并发之并发实践

news2024/11/23 7:28:09

一、死锁

线程死锁

死锁是指两个或者两个以上的线程在执行的过程中,因争夺资源产生的一种互相等待现象

假设线程 A 持有资源 1,线程 B 持有资源 2,它们同时都想申请对方的资源,那么这两个线程就会互相等待而进入死锁状态。

使用 Java 代码模拟上述死锁场景:

public class Resources {
    public static final Object resource1 = new Object(); // 资源 1
    public static final Object resource2 = new Object(); // 资源 2
}
public class ThreadA extends Thread{

    @Override
    public void run() {
        synchronized (Resources.resource1) {
            System.out.println(Thread.currentThread().getName() + " get Resource-1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " waiting get Resource-2");
            synchronized (Resources.resource2) {
                System.out.println(Thread.currentThread().getName() + " get Resource-2");
            }
        }
    }
}
public class ThreadB extends Thread{

    @Override
    public void run() {
        synchronized (Resources.resource2) {
            System.out.println(Thread.currentThread().getName() + " get Resource-2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " waiting get Resource-1");
            synchronized (Resources.resource1) {
                System.out.println(Thread.currentThread().getName() + " get Resource-1");
            }
        }
    }
}
public class DeadLockDemo {
    public static void main(String[] args) {
        Thread threadA=new ThreadA();
        threadA.setName("Thread A");
        threadA.start();

        Thread threadB=new ThreadB();
        threadB.setName("Thread B");
        threadB.start();
    }
}

输出结果:

Thread A get Resource-1
Thread B get Resource-2
Thread B waiting get Resource-1
Thread A waiting get Resource-2

线程 A 通过 synchronized (resource1) 获得 resource1 的监视器锁,然后线程 A 休眠 1s,执行线程 B 获取到 resource2 的监视器锁。线程 A 和线程 B 休眠结束了都开始企图请求获取对方的资源,然后这两个线程就会陷入互相等待的状态,这也就产生了死锁。

上面的例子符合产生死锁的四个必要条件:

  • 互斥:每个资源要么已经分配给了一个进程,要么就是可用的。
  • 占有和等待:已经得到了某个资源的进程可以再请求新的资源。
  • 不可抢占:已经分配给一个进程的资源不能强制性地被抢占,它只能被占有它的进程显式地释放。
  • 环路等待:有两个或者两个以上的进程组成一条环路,该环路中的每个进程都在等待下一个进程所占有的资源。

死锁预防

在程序运行之前预防发生死锁。

  • 破坏占有和等待条件:一种实现方式是规定所有进程在开始执行前请求所需要的全部资源。
  • 破坏不可抢占条件:占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。
  • 破坏环路等待条件:给资源统一编号,进程只能按编号顺序来请求资源。

我们对线程 B 的代码进行如下改造,就不会出现死锁了。

public class ThreadB extends Thread{

    @Override
    public void run() {
        synchronized (Resources.resource1) {
            System.out.println(Thread.currentThread().getName() + " get Resource-1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " waiting get Resource-2");
            synchronized (Resources.resource1) {
                System.out.println(Thread.currentThread().getName() + " get Resource-2");
            }
        }
    }
}

输出结果:

Thread A get Resource-1
Thread A waiting get Resource-2
Thread A get Resource-2
Thread B get Resource-1
Thread B waiting get Resource-2
Thread B get Resource-2

线程 A 首先获得到 resource1 的监视器锁,这时候线程 2 就获取不到了。然后线程 1 再去获取 resource2 的监视器锁,可以获取到。线程 1 释放了对 resource1、resource2 的监视器锁的占用,线程 2 获取到就可以执行了。这样就破坏了环路等待条件,因此避免了死锁。

二、线程不安全示例

如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。

以下代码演示了 1000 个线程同时对 cnt 执行自增操作,操作结束之后它的值有可能小于 1000。

public class ThreadUnsafeExample {

    private int cnt = 0;

    public void add() {
        cnt++;
    }

    public int get() {
        return cnt;
    }
}
public static void main(String[] args) throws InterruptedException {
    final int threadSize = 1000;
    ThreadUnsafeExample example = new ThreadUnsafeExample();
    final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < threadSize; i++) {
        executorService.execute(() -> {
            example.add();
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    System.out.println(example.get());
}

输出结果:

998 //如果线程是安全的最终结果就是 1000

解决线程安全问题:

  • 方案一:使用 AtomicInteger(实际上非阻塞同步)

    public class ThreadSafeExample {
        // cnt 的初始值为 0。
        private AtomicInteger cnt = new AtomicInteger(0);

        public void add() {
            cnt.incrementAndGet();
        }

        public int get() {
            return cnt.get();
        }

        public static void main(String[] args) throws InterruptedException {
            final int threadSize = 1000;
            ThreadSafeExample example = new ThreadSafeExample();
            final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < threadSize; i++) {
                executorService.execute(() -> {
                    example.add();
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(example.get());
        }
    }

    输出结果:

    1000
  • 方案二:使用 synchronized(实际上是阻塞同步/互斥同步)

    public class ThreadSafeExample2 {
        // cnt 的初始值为 0。
        private int cnt = 0;

        public synchronized void add() {
            cnt++;
        }

        public synchronized int get() {
            return cnt;
        }

        public static void main(String[] args) throws InterruptedException {
            final int threadSize = 1000;
            ThreadSafeExample2 example = new ThreadSafeExample2();
            final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < threadSize; i++) {
                executorService.execute(() -> {
                    example.add();
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(example.get());
        }
    }

    输出结果:

    1000
  • 方案三:使用 Reentrant(实际上是阻塞同步/互斥同步)

    public class ThreadSafeExample3 {
        // cnt 的初始值为 0。
        private int cnt = 0;

        private ReentrantLock lock = new ReentrantLock();

        public void add() {
            try{
                lock.lock();
                cnt++;
            }finally {
                lock.unlock();
            }
        }

        public int get() {
            try{
                lock.lock();
                return cnt;
            }finally {
                lock.unlock();
            }
        }

        public static void main(String[] args) throws InterruptedException {
            final int threadSize = 1000;
            ThreadSafeExample3 example = new ThreadSafeExample3();
            final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < threadSize; i++) {
                executorService.execute(() -> {
                    example.add();
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(example.get());
        }
    }

    输出结果:

    1000

三、实现线程安全方式

所谓线程安全就是说,多个线程不管以何种方式访问某个类,并且在主调代码中不需要进行同步,都能表现正确的行为。

线程安全有以下几种实现方式:不可变、互斥同步、非阻塞同步、无同步方案(栈封闭和线程本地存储)

不可变

不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

不可变的类型:

  • final 关键字修饰的基本数据类型
  • String
  • 枚举类型
  • Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。

对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。

public class ImmutableExample {
    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
        unmodifiableMap.put("a"1);
    }
}
Exception in thread "main" java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
    at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常。

public V put(K key, V value) {
    throw new UnsupportedOperationException();
}

互斥同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略:总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁。

Java 中 synchronizedReentrantLock 等独占锁就是悲观锁思想的实现。

非阻塞同步

乐观锁基于冲突检测的乐观并发策略:先进行操作,如果没有其他线程争用共享数据,操作成功;如果数据存在竞争,就采用补偿措施(常见的有不断重试,直到成功)。这种乐观的并发策略的许多实现是不需要将线程挂起的,因此这种同步操作称为非阻塞同步

1. CAS

乐观锁需要操作冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。、

硬件支持的原子性操作最典型的是:CAS(Compare-and-Swap)。

当多个线程尝试使用 CAS 同时更新一个共享变量时,只有其中一个线程能够更新共享变量中的值,其他线程都失败,失败的线程不会被挂起,而是被告知在这次竞争中失败,并且可以再次尝试。

CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

2. 原子操作类

J.U.C 包里面的原子操作类的方法调用了 Unsafe 类的 CAS 操作。

无同步方案

要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

1. 栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。

public class StackClosedExample {
    public void add100() {
        int cnt = 0;
        for (int i = 0; i < 100; i++) {
            cnt++;
        }
        System.out.println(cnt);
    }
}
public static void main(String[] args) {
    StackClosedExample example = new StackClosedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> example.add100());
    executorService.execute(() -> example.add100());
    executorService.shutdown();
}
100
100

2. 线程本地存储(Thread Local Storage)

如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。

可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。如果创建一个 ThreadLocal 变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题。

ThreadLocal 示例
public class ThreadLocalExample {
    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal.set(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadLocal.get());
            threadLocal.remove();
        });
        Thread thread2 = new Thread(() -> {
            threadLocal.set(2);
            threadLocal.remove();
        });
        thread1.start();
        thread2.start();
    }
}
1

thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间之后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。

ThreadLocal 原理

每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。

public class Thread implements Runnable {
    
    // 与此线程有关的 ThreadLocal 值,由 ThreadLocal 类维护
    ThreadLocal.ThreadLocalMap threadLocals = null;
}

默认情况下 threadLocals 变量值为 null,只有当前线程调用 ThreadLocal 类的 set() 或 get() 方法时才创建,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap 类对应的 get()、set()方法。

当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 <ThreadLocal 对象,value> 键值对插入到该 Map 中。变量是存放在当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 中

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); // 获取当前线程的 ThreadLocalMap 对象
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

get() 方法类似。

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); // 获取当前线程的 ThreadLocalMap 对象
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

所以对于以下代码:

public class ThreadLocalExample1 {
    public static void main(String[] args) {
        ThreadLocal threadLocal1 = new ThreadLocal();
        ThreadLocal threadLocal2 = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal1.set(1);
            threadLocal2.set(1);
        });
        Thread thread2 = new Thread(() -> {
            threadLocal1.set(2);
            threadLocal2.set(2);
        });
        thread1.start();
        thread2.start();
    }
}

其对应的底层结构图为:

由上图可以看出,ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。

ThreadLocal 内存泄露问题

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。ThreadLocalMap 中就会出现 key 为 null 的 Entry,如果我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。

ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()、get()、remove() 方法的时候,会清理掉 key 为 null 的记录。应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。

四、线程间通信

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

join()

在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。

对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}

public static void main(String[] args) {
    JoinExample example = new JoinExample();
    example.test();
}
A
B

wait() & notify()/notifyAll()

调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。

它们都属于 Object 的一部分,而不属于 Thread。

只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。

使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

public class WaitNotifyExample {

    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

await() & signal()/signalAll()

java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。

相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。

使用 Lock 来获取一个 Condition 对象。

public class AwaitSignalExample {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

五、实现生产者和消费者

wait() & notifyAll()

public class ProducerConsumer {
    private static class Producer implements Runnable{
        private List<Integer> list;
        private int capacity;

        public Producer(List list, int capacity) {
            this.list = list;
            this.capacity = capacity;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        String producer = Thread.currentThread().getName();
                        while (list.size() == capacity) {
                            System.out.println("生产者 " + producer +
                                    "  list 已达到最大容量,进行 wait");
                            list.wait();
                            System.out.println("生产者 " + producer +
                                    "  退出 wait");
                        }
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生产者 " + producer +
                                " 生产数据" + i);
                        list.add(i);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    private static class Consumer implements Runnable{
        private List<Integer> list;

        public Consumer(List list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        String consumer = Thread.currentThread().getName();
                        while (list.isEmpty()) {
                            System.out.println("消费者 " + consumer +
                                    "  list 为空,进行 wait");
                            list.wait();
                            System.out.println("消费者 " + consumer +
                                    "  退出wait");
                        }
                        Integer element = list.remove(0);
                        System.out.println("消费者 " + consumer +
                                "  消费数据:" + element);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        final LinkedList linkedList = new LinkedList();
        final int capacity = 5;
        Thread producer = new Thread(new Producer(linkedList,capacity),"producer");
        Thread consumer = new Thread(new Consumer(linkedList),"consumer");

        consumer.start();
        producer.start();
    }
}

输出结果:

生产者 producer 生产数据-1652445373
生产者 producer 生产数据1234295578
生产者 producer 生产数据-1885445180
生产者 producer 生产数据864400496
生产者 producer 生产数据621858426
生产者 producer  list 已达到最大容量,进行 wait
消费者 consumer  消费数据:-1652445373
消费者 consumer  消费数据:1234295578
消费者 consumer  消费数据:-1885445180
消费者 consumer  消费数据:864400496
消费者 consumer  消费数据:621858426
消费者 consumer  list 为空,进行 wait
生产者 producer  退出 wait

await() & sigalAll()

public class ProducerConsumer {
    private static ReentrantLock lock = new ReentrantLock();

    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    private static class Producer implements Runnable{
        private List<Integer> list;
        private int capacity;

        public Producer(List list, int capacity) {
            this.list = list;
            this.capacity = capacity;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    lock.lock();
                    String producer = Thread.currentThread().getName();
                    while (list.size() == capacity) {
                        System.out.println("生产者 " + producer +
                                           "  list 已达到最大容量,进行 wait");
                        full.await();
                        System.out.println("生产者 " + producer +
                                           "  退出 wait");
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者 " + producer +
                                       " 生产数据" + i);
                    list.add(i);
                    empty.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    private static class Consumer implements Runnable{
        private List<Integer> list;

        public Consumer(List list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    lock.lock();
                    String consumer = Thread.currentThread().getName();
                    while (list.isEmpty()) {
                        System.out.println("消费者 " + consumer +
                                           "  list 为空,进行 wait");
                        empty.await();
                        System.out.println("消费者 " + consumer +
                                           "  退出wait");
                    }
                    Integer element = list.remove(0);
                    System.out.println("消费者 " + consumer +
                                       "  消费数据:" + element);
                    full.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        final LinkedList linkedList = new LinkedList();
        final int capacity = 5;
        Thread producer = new Thread(new Producer(linkedList,capacity),"producer");
        Thread consumer = new Thread(new Consumer(linkedList),"consumer");

        consumer.start();
        producer.start();
    }
}

输出结果:

生产者 producer 生产数据-1748993481
生产者 producer 生产数据-131075825
生产者 producer 生产数据-683676621
生产者 producer 生产数据1543722525
生产者 producer 生产数据804266076
生产者 producer  list 已达到最大容量,进行 wait
消费者 consumer  消费数据:-1748993481
消费者 consumer  消费数据:-131075825
消费者 consumer  消费数据:-683676621
消费者 consumer  消费数据:1543722525
消费者 consumer  消费数据:804266076
消费者 consumer  list 为空,进行 wait
生产者 producer  退出 wait

阻塞队列

public class ProducerConsumer {

    private static class Producer implements Runnable{
        private BlockingQueue<Integer> queue;

        public Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String producer = Thread.currentThread().getName();
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者 " + producer +
                            " 生产数据" + i);
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static class Consumer implements Runnable{
        private BlockingQueue<Integer> queue;

        public Consumer(BlockingQueue<Integer> queue) {
            this.queue =queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String consumer = Thread.currentThread().getName();
                    Integer element = queue.take();
                    System.out.println("消费者 " + consumer +
                            "  消费数据:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        Thread producer = new Thread(new Producer(queue),"producer");
        Thread consumer = new Thread(new Consumer(queue),"consumer");

        consumer.start();
        producer.start();
    }
}

输出结果:

生产者producer生产数据-222876564
消费者consumer正在消费数据-906876105
生产者producer生产数据-9385856
消费者consumer正在消费数据1302744938
生产者producer生产数据-177925219
生产者producer生产数据-881052378
生产者producer生产数据-841780757
生产者producer生产数据-1256703008
消费者consumer正在消费数据1900668223
消费者consumer正在消费数据2070540191
消费者consumer正在消费数据1093187
消费者consumer正在消费数据6614703
消费者consumer正在消费数据-1171326759

六、多线程编程实战

LeetCode-按序打印

1114. 按序打印

参考代码:

import java.util.concurrent.CountDownLatch;

class Foo {

    private CountDownLatch countDownLatchA; //等待 A 线程执行完
    private CountDownLatch countDownLatchB; //等待 B 线程执行完

    public Foo() {
        countDownLatchA = new CountDownLatch(1); //只等待一个线程,即 A 线程
        countDownLatchB = new CountDownLatch(1); //只等待一个线程,即 B 线程
    }

    public void first(Runnable printFirst) throws InterruptedException {

        // printFirst.run() outputs "first". Do not change or remove this line.
        printFirst.run();
        countDownLatchA.countDown();
        //减去 1 后,cnt =0 那些因为调用 await() 方法而在等待的线程就会被唤醒。
    }

    public void second(Runnable printSecond) throws InterruptedException {
        countDownLatchA.await();
        // printSecond.run() outputs "second". Do not change or remove this line.
        printSecond.run();
        countDownLatchB.countDown();
        //减去 1 后,cnt =0 那些因为调用 await() 方法而在等待的线程就会被唤醒。
    }

    public void third(Runnable printThird) throws InterruptedException {
        countDownLatchB.await();
        // printThird.run() outputs "third". Do not change or remove this line.
        printThird.run();
    }
}

多线程良好开发习惯

  • 给线程起个有意义的名字,这样可以方便找 Bug。
  • 缩小同步范围,从而减少锁争用。例如对于 synchronized,应该尽量使用同步块而不是同步方法。
  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现复杂控制流;其次,这些同步类是由最好的企业编写和维护,在后续的 JDK 中还会不断优化和完善。
  • 使用 BlockingQueue 实现生产者消费者问题。
  • 多用并发集合少用同步集合,例如应该使用 ConcurrentHashMap 而不是 Hashtable。
  • 使用本地变量和不可变类来保证线程安全。
  • 使用线程池而不是直接创建线程,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务。

获取更多干货内容,记得关注我哦。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2194093.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多功能声学综合馆:流动会场的新标杆—轻空间

随着现代会议、展览、演出和活动的多元化需求&#xff0c;场地的灵活性与适应性变得尤为重要。传统的固定场馆难以满足各类活动的复杂需求&#xff0c;而多功能声学综合馆凭借其灵活、便捷、专业的声学性能&#xff0c;成为了市场上一颗闪耀的新星。其“流动会场”的特性&#…

计算机取证

文章目录 思维导图计算机取证数据固定FTK ImageDumpIt 数据分析——磁盘镜像仿真软件自动仿真手动仿真仿真后的取证分析 基本信息及用户痕迹1.名称、版本、build号、系统目录、位数、产品秘钥等2.安装时间3.最后一次关机时间4.USB使用记录5.WIFI信息6.近期访问过的文档、程序7.…

动销方案:剑指市场份额扩张

在竞争激烈的市场中&#xff0c;企业如何扩大市场份额&#xff1f;动销&#xff0c;即拉动销售&#xff0c;乃是关键手段。 首先进行市场分析。行业现状方面&#xff0c;以快速消费品行业为例&#xff0c;市场规模大且持续增长&#xff0c;但竞争激烈&#xff0c;各大品牌不断推…

深化理解:RAG应用搭建进阶指南

大型语言模型&#xff08;LLM&#xff09;的文本推理能力&#xff0c;宛如一位博学的公民&#xff0c;其智慧之源来自于互联网上公开的文献宝库。想象一下&#xff0c;这位名为LLM的公民&#xff0c;如同一位勤奋的学者&#xff0c;借阅了图书馆中所有的书籍&#xff0c;并将这…

杀疯了深度解析chatGPT和NLP底层技术——复旦大学新版《自然语言处理导论》

在今年的2月28号&#xff0c;复旦张琦教授放出了自己的大招&#xff0c;发布了自己历时近三年之久&#xff0c;即自身对自然语言处理20年研究的著作 全文共 600页&#xff0c; 涉及了 787 篇参考文献&#xff0c; 全面且深度的解析了与NLP的底层知识。 内容介绍&#xff1a; …

【C++ Primer Plus】4

2 字符串 字符串是存储在内存的连续字节中的一系列字符&#xff1b;C处理字符串的方式有两种&#xff0c; c-风格字符串&#xff08;C-Style string&#xff09;string 类 2.1 c-风格字符串&#xff08;C-Style string&#xff09; 2.1.1 char数组存储字符串&#xff08;c-…

网 络 安 全

网络安全是指保护网络系统及其所存储或传输的数据免遭未经授权访问、使用、揭露、破坏、修改或破坏的实践和技术措施。网络安全涉及多个方面&#xff0c;包括但不限于以下几个方面&#xff1a; 1. 数据保护&#xff1a;确保数据在传输和存储过程中的完整性和保密性&#xff0c;…

微服务es+Kibana解析部署使用全流程

1、介绍 ElasticSearch是Java开发的一款开源的&#xff0c;分布式的搜索引擎。 它的搜索采用内存中检索的方式&#xff0c;大大提高了检索的效率&#xff0c;es是基于REST API的方式对数据操作的&#xff0c;可以让存储、检索、索引效率更高。 1、es可以做什么 网站检索数据…

python爬虫 - 深入requests模块

&#x1f308;个人主页&#xff1a;https://blog.csdn.net/2401_86688088?typeblog &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html 目录 ​编辑 前言 一、下载网络文件 &#xff08;一&#xff09;基本步骤 &#xff0…

【AIGC】如何选择AI绘画工具?Midjourney VS Stable Diffusion

前言 文章目录 &#x1f4af;如何选择合适的AI绘画工具 个人需求选择比较工具特点社区和资源 &#x1f4af; Midjourney VS Stable Diffusion&#xff1a;深度对比与剖析 使用费用对比使用便捷性与系统兼容性对比开源与闭源对比图片质量对比上手难易对比学习资源对比作品版权问…

Vue入门-指令学习-v-else和v-else-if

v-else和v-else-if 作用&#xff1a;辅助v-if进行判断渲染 语法&#xff1a;v-else v-else-if"表达式" 注意&#xff1a;需要紧挨着v-if一起使用 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><m…

九大排序之插入排序

1.前言 插入排序是把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。实际中我们玩扑克牌时&#xff0c;就用了插入排序的思想。 本章重点&#xff1a;主要着重的介绍两种插入排序…

【JavaEE】【多线程】进程与线程的概念

目录 进程系统管理进程系统操作进程进程控制块PCB关键属性cpu对进程的操作进程调度 线程线程与进程线程资源分配线程调度 线程与进程区别线程简单操作代码创建线程查看线程 进程 进程是操作系统对一个正在运行的程序的一种抽象&#xff0c;可以把进程看做程序的一次运行过程&a…

开发自定义starter

环境&#xff1a;Spring Cloud Gateway 需求&#xff1a;防止用户绕过网关直接访问服务器&#xff0c;用户只需引入依赖即可。 1、创建项目 首先创建一个spring boot项目 2、配置pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xm…

国外电商系统开发-运维系统文件上传

文件上传&#xff0c;是指您把您当前的PC电脑上的文件批量的上传到远程服务器上&#xff0c;在这里&#xff0c;您可以很轻松的通过拖动方式上传&#xff0c;只需要动动鼠标就搞定。 第一步&#xff0c;您应该选择要上传的服务器&#xff1a; 选择好了以后&#xff0c;点击【确…

SpringBoot框架下的教育系统开发全解析

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理微服务在线教育系统的相关信息成为必然。开…

毕业设计项目——基于transformer的中文医疗领域命名实体识别(论文/代码)

完整的论文代码见文章末尾 以下为核心内容 摘要 近年来&#xff0c;随着深度学习技术的发展&#xff0c;基于Transformer和BERT的模型在自然语言处理领域取得了显著进展。在中文医疗领域&#xff0c;命名实体识别(Named Entity Recognition, NER)是一项重要任务&#xff0c;旨…

ArkUI中的状态管理

一、MVVM ArkUI提供了一系列装饰器实现ViewModel的能力,如@Prop、@Link、@Provide、LocalStorage等。当自定义组件内变量被装饰器装饰时变为状态变量,状态变量的改变会引起UI的渲染刷新。 在ArkUI的开发过程中,如果没有选择合适的装饰器或合理的控制状态更新范围,可能会导…

《大规模语言模型从理论到实践》第一轮学习笔记

第一章 绪论 本章主要介绍大规模语言模型基本概念、发展历程和构建流程。 大规模语言模型&#xff08;Large Language Models&#xff0c;LLM&#xff09;&#xff0c;也称大语言模型 或大型语言模型。 1.1 大规模语言模型基本概念 1.语言模型&#xff08;Language Model&a…

重学SpringBoot3-集成Redis(五)之布隆过滤器

更多SpringBoot3内容请关注我的专栏&#xff1a;《SpringBoot3》 期待您的点赞&#x1f44d;收藏⭐评论✍ 重学SpringBoot3-集成Redis&#xff08;五&#xff09;之布隆过滤器 1. 什么是布隆过滤器&#xff1f;基本概念适用场景 2. 使用 Redis 实现布隆过滤器项目依赖Redis 配置…