跳至主要內容

⑤JAVA 原子类和并发工具类

holic-x...大约 17 分钟JAVA基础

⑤JAVA 原子类和并发工具类

学习核心

  • 如何理解CAS机制(概念、问题和缺陷)
  • JDK中哪些地方用到了CAS?举例说明
  • 并发工具类

学习资料

CAS和原子操作类

1.CAS

👻什么是CAS?

​ CAS(Compare-And-Swap:对比交换)是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值

​ 经过调查发现,其实现方式是基于硬件平台的汇编指令,就是说CAS是靠硬件实现的,JVM只是封装了汇编调用,那些Atomiclnteger类便是使用了这些封装后的接口。

​ 简单解释:CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。

CAS应用案例

​ JAVA并发理论-并发不安全案例:并发线程累加操作(对同一个变量累加500次,得到的结果小于500),其背后原因就是并发状态下线程不安全导致

​ 针对这个问题,将共享的操作变量int替换为AtomicInteger,也可以解决并发问题。而AtomicInteger的实现则是基于CAS

class ThreadUnsafeOperator{

    // 定义一个共享数据
    // private static int count = 0; // 线程不安全的定义
    private AtomicInteger count = new AtomicInteger(0);

    // 对外提供数据操作方法
    public void add(){
        // count ++;
        count.getAndIncrement();
    }

    // 对外提供数据访问方法
    public int get(){
        // return count;
        return count.get();
    }

}

// 线程不安全示例:引入CAS解决
public class ThreadUnsafeDemo {

    // 借助并发工具类CountDownLatch(保证线程池完成500次累加)
    public static void modOpByCountDownLatch() throws InterruptedException {
        // 定义操作对象
        ThreadUnsafeOperator op = new ThreadUnsafeOperator();
        final int threadSize = 500;
        final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 模拟500个线程执行自增操作
        for(int i =0;i<threadSize;i++){
            executorService.execute(()->{
                // 调用方法执行自增操作
                op.add();
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        // 获取最终count结果
        System.out.println("count:"+op.get());
    }

    public static void main(String[] args) throws InterruptedException {
        modOpByCountDownLatch();
    }
}

👻CAS问题

​ CAS方式为乐观锁、synchronized为悲观锁,一般场景下使用CAS解决并发问题性能更优,但是CAS存在三大问题:ABA问题、循环开销大、只能保证一个共享变量的原子操作

ABA问题

问题分析:因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A(经过了A =》B =》A的转化),那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA很多业务场景是可以容忍的,但是有些业务场景就会带来问题。

​ 结合银行转账的例子分析:小牛取款,银行帐户有100块,准备取50。由于机器不太好使,多点了几次取款操作。触发了后台threadA和threadB的工作,此时threadA操作成功(100->50),threadB阻塞。正好牛妈打款50元给小牛(50->100),threadC执行成功。之后threadB运行了,又改为(100->50)。最终,小牛账户直接少了50块钱,等于被错误的扣了两次款

解决思路:使用版本号,在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。从 Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference 来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值

循环开销大

​ 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销

只能保证一个共享变量的原子操作

​ 当对一个共享变量执行操作时,可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁

2.原子操作类

为什么需要原子操作类?

​ 保证线程安全是 Java 并发编程必须要解决的重要问题。Java 从原子性、可见性、有序性这三大特性入手,确保多线程的数据一致性。

​ 确保线程安全最常见的做法是利用锁机制(Lock、 sychronized )来对共享数据做互斥同步,这样在同一个时刻只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的、线程安全的。互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。

​ volatile 是轻量级的锁(自然比普通锁性能要好),它保证了共享变量在多线程中的可见性,但无法保证原子性。所以,它只能在一些特定场景下使用

​ 为了兼顾原子性以及锁带来的性能问题,Java引入了CAS(主要体现在 Unsafe 类)来实现非阻塞同步(也叫乐观锁)。并基于 CAS,提供了一套原子工具类

原子变量类 比锁的粒度更细,更轻量级,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上。

原子变量类相当于一种泛化的 volatile 变量,能够支持原子的、有条件的读/改/写操作。

原子类在内部使用 CAS 指令(基于硬件的支持)来实现同步。这些指令通常比锁更快。

原子变量类可以分为 4 组:

  • 基本类型
    • AtomicBoolean - 布尔类型原子类
    • AtomicInteger - 整型原子类
    • AtomicLong - 长整型原子类
  • 引用类型
    • AtomicReference - 引用类型原子类
    • AtomicMarkableReference - 带有标记位的引用类型原子类
    • AtomicStampedReference - 带有版本号的引用类型原子类
  • 数组类型
    • AtomicIntegerArray - 整形数组原子类
    • AtomicLongArray - 长整型数组原子类
    • AtomicReferenceArray - 引用类型数组原子类
  • 属性更新器类型
    • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
    • AtomicLongFieldUpdater - 长整型字段的原子更新器。
    • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段

👻原子更新基本类型

使用原子的方式更新基本类型

  • AtomicBoolean:原子更新布尔类型

  • Atomiclnteger:原子更新整型

  • AtomicLong:原子更新长整型

​ 3个类提供的方法几乎一模一样,以Atomiclnteger为例进行原理分析,Atomicinteger的常用方法如下

方法说明
public final int get()获取当前的
public final int getAndSet(int newValue)获取当前的值,并设置新的值
public final int getAndlncrement()获取当前的值,并自增1
public final int getAndDecrement()获取当前的值,并自减1
public final int getAndAdd(int delta)获取当前的值,并加上预期的值
void lazySet(int newValue)最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值

​ 查看Atomiclnteger分析源码:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
	try {
		valueOffset = unsafe.objectFieldOffset
			(AtomicInteger.class.getDeclaredField("value"));
	} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;
  • value - value 属性使用 volatile 修饰,使得对 value 的修改在并发环境下对所有线程可见。
  • valueOffset - value 属性的偏移量,通过这个偏移量可以快速定位到 value 字段,这个是实现 AtomicInteger 的关键。
  • unsafe - Unsafe 类型的属性,它为 AtomicInteger 提供了 CAS 操作

​ 查看Atomiclnteger#getAndlncrement方法分析源码:可以看到getAndIncrement的自增操作,最终调用的是unsafe类提供的compareAndSwapInt方法

count.getAndIncrement();

// getAndIncrement
public final int getAndIncrement() {
  return unsafe.getAndAddInt(this, valueOffset, 1);
}


public final int getAndAddInt(Object o, long offset, int delta) {
  int v;
  do {
    v = getIntVolatile(o, offset);
  } while (!compareAndSwapInt(o, offset, v, v + delta));
  return v;
}

👻原子更新数组类型

通过原子的方式更新数组里的某个元素,Atomic包提供了以下的3个类:

  • AtomiclntegerArray:原子更新整型数组里的元素
  • AtomicLongArray: 原子更新长整型数组里的元素
  • AtomicReferenceArray:原子更新引用类型数组里的元素

这三个类的最常用的方法是如下两个方法:

  • get(intindex):获取索引为index的元素值
  • compareAndSet(inti,E expect,E update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置为update值
// AtomicIntegerArray
public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
        AtomicIntegerArray array = new AtomicIntegerArray(new int[]{0,1,2,3,4});
        System.out.println(array);
        System.out.println(array.get(1));
        System.out.println(array.compareAndSet(1,1,5));
        System.out.println(array);
        // 预期不一致
        System.out.println(array.compareAndSet(1,3,10));
        System.out.println(array);
    }
}

// output
[0, 1, 2, 3, 4]
1
true
[0, 5, 2, 3, 4]
false
[0, 5, 2, 3, 4]

​ compareAndSet:当指定的值和预期值不一样的时候,该操作返回失败,不对容器做任何修改操作

👻原子更新 引用类型

Atomic包提供了以下三个类:

  • AtomicReference: 原子更新引用类型
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段
  • AtomicMarkableReferce:原子更新带有标记位的引用类型

​ 这三个类提供的方法都差不多,首先构造一个引用对象,然后把引用对象set进Atomic类,然后调用compareAndSet等一些方法去进行原子操作,原理都是基于Unsafe实现,但AtomicReferenceFieldUpdater 略有不同,更新的字段必须用volatile修饰。

AtomicReference使用举例:

// 原子更新引用类型
public class AtomicReferenceDemo {

    public static AtomicReference<Person> atomicUserRef = new AtomicReference<Person>();

    public static void main(String[] args) {
        // 定义原子更新引用对象
        Person p = null;
        Person p1 = new Person("P1",12);
        atomicUserRef.set(p1);
        Person p2 = new Person("P2",18);
        // 比较当前引用对象和预期对象是否一致,如果是同一个对象则更新
        atomicUserRef.compareAndSet(p, p2);
        // 打印结果
        Person resPerson = atomicUserRef.get();
        System.out.println(resPerson.getName() + "-" + resPerson.getAge());
    }
}

// output
P1-12
  
// 此处初始设定的是p1对象,compareAndSet校验的是p对象,与预期不一致,则不会执行更新操作
  • 代码思路:先构建一个Person对象,将其设置到AtomicReference中,最后调用compareAndSet方法进行原子更新操作
  • 此处compareAndSet比较的是两个对象(确认两个对象的地址是否相等)

👻原子更新字段类型

​ 如果需原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic包提供了以下3个类进行原子字段更新:

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器
  • AtomicStampedReference:原子更新带有版本号的引用类型

​ 该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的 ABA问题。

​ 要想原子地更新字段类需要两步:

​ (1)因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性

​ (2)更新类的字段(属性)必须使用pubilic volatile修饰符

​ 以上3个类提供的方法几乎一样,此处以AtomicIntegerFieldUpdater为例进行分析

AtomicIntegerFieldUpdater案例分析

class User{
    private String name;
    public volatile int age;
    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}



// 原子更新字段类型 demo
public class AtomicIntegerFieldUpdaterDemo {

    private static AtomicIntegerFieldUpdater<User> aifu = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

    public static void main(String[] args) {
        User user = new User("noob",18);
        System.out.println(aifu.getAndIncrement(user));
        System.out.println(aifu.get(user));
    }
}

// output
18
19

原子化的累加器

DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果仅仅需要累加操作,使用原子化的累加器性能会更好,代价就是会消耗更多的内存空间。

LongAdder 内部由一个 base 变量和一个 cell[] 数组组成。

  • 当只有一个写线程,没有竞争的情况下,LongAdder 会直接使用 base 变量作为原子操作变量,通过 CAS 操作修改变量;
  • 当有多个写线程竞争的情况下,除了占用 base 变量的一个写线程之外,其它各个线程会将修改的变量写入到自己的槽 cell[] 数组中。

LongAdder 在操作后的返回值只是一个近似准确的数值,但是 LongAdder 最终返回的是一个准确的数值, 所以在一些对实时性要求比较高的场景下,LongAdder 并不能取代 AtomicIntegerAtomicLong

并发工具类

1.CountDownLatch(等多线程完成)

​ CountDownLatch允许一个或多个多线程等待其他线程完成操作

案例:假设主线程需要等待5个子线程分别完成任务之后,再执行总任务,可以利用CountDownLatch实现

// 案例:主线程需要等待5个子线程分别完成任务之后,再执行总任务
public class CountDownLatchDemo {

  public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(5);
    for (int i = 0; i < 5; i++) {
      String taskName = "task_" + i;
      new Thread(
        new Runnable() {
          @Override
          public void run() {
            System.out.println("完成任务:" + Thread.currentThread().getName());
            countDownLatch.countDown();
          }
        }, taskName).start();
    }
    // 等待所有任务完成
    countDownLatch.await();
    System.out.println("所有任务完成");
  }
}

// output
完成任务:task_0
完成任务:task_2
完成任务:task_3
完成任务:task_4
完成任务:task_1
所有任务完成

​ CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果想等待N个点完成,这里就传入N。当调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。

​ 如果有某个任务处理得比较慢,不可能让主线程一直等待,所以可以使用另外一个带指定时间的await方法await(long time.TimeUnit unit),这个方法等待特定时间后,就会不再阻塞当前线程.

2.CyclicBarrier(同步屏障)

​ CyclicBarrier:可循环使用(Cyclic)的屏障(Barier)。

​ 其作用:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

​ 类比理解:坐过那种滚动发车的汽车吗,例如汽车作为10个,坐满就走,那前9个坐上车的人就只有等待,第10个人来了之后就发车,一起出发。

代码示例:11个空位,主线程和子线程看成一样的,不用区别理解。前10个线程到达之后只能等待,等到第11个线程到来之后,全部一起执行。

// 同步屏障 demo
public class CyclicBarrierDemo {

    public static void main(String[] args) throws Exception {
        CyclicBarrier cb = new CyclicBarrier(11);
        for (int i = 1; i <= 10; i++) {
            String taskName = "task_" + i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "正在等待...");
                try {
                    cb.await();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName() + "完成任务");
            },taskName).start();
        }
        // 模拟延迟完成
        Thread.sleep(200);
        System.out.println("任务11还有5s,其他任务先等待...");
        Thread.sleep(5000);
        cb.await();
        System.out.println("主线程作为第11个线程,一起执行");
    }
}

// output
task_4正在等待...
task_5正在等待...
task_9正在等待...
task_10正在等待...
task_3正在等待...
task_7正在等待...
task_1正在等待...
task_6正在等待...
task_2正在等待...
task_8正在等待...
任务11还有5s,其他任务先等待...
主线程作为第11个线程,一起执行
task_4完成任务
task_5完成任务
task_9完成任务
task_10完成任务
task_3完成任务
task_7完成任务
task_1完成任务
task_6完成任务
task_8完成任务
task_2完成任务

​ CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行次。CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量等。

3.Semaphore(信号量)

Semaphore(信号量)是用于控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理地使用公共资源

案例分析

public class SemaphoreDemo {

    public static void main(String[] args) throws InterruptedException {
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 20; i++) {
            String taskName = "task_" + i;
            new Thread(() -> {
                try {
                    s.acquire();
                    System.out.println("执行任务:" + Thread.currentThread().getName());
                    Thread.sleep(3000);
                    s.release();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, taskName).start();
        }
        Thread.sleep(20000);
    }
}

​ 虽然会有20个线程尝试执行,但是同一时间只会有5个线程可以运行。Semaphore的构造方法 Semaphore(intpermits)接受一个整型的数字,表示可用的许可证数量。Semaphore(5)表示允许5个线程获取许可证,也就是最大并发数是5。

​ Semaphore的用法也很简单,首先线程使用 Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以 用tryAcquire()方法尝试获取许可证。

Semaphore还提供一些其他方法,具体如下:

  • intavailablePermits():返回此信号量中当前可用的许可证数
  • intgetQueueLength():返回正在等待获取许可证的线程数
  • booleanhasQueuedThreads():是否有线程正在等待获取许可证
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法

场景应用

  • Semaphore 可以用于实现资源池,如数据库连接池
  • Semaphore 可以用于将任何一种容器变成有界阻塞容器

4.Exchanger(线程间交换数据)

​ Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据

​ 如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

​ exchange()方法:入参是本线程需要传递给对方的数据,返回值则是对方线程传过来的数据。

// 线程间数据交换Exchanger
public class ExchangerDemo {

    public static void main(String[] args) {
         final Exchanger<String> exchanger = new Exchanger();
         // 构建线程A
         new Thread(()->{
             try {
                 String send = "你好 我是AAA";
                 String receive = exchanger.exchange(send);
                 System.out.println(Thread.currentThread().getName() + "接收消息:" + receive);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
         },"A").start();

        // 构建线程B
        new Thread(()->{
            try {
                String send = "你好 我是BBB";
                String receive = exchanger.exchange(send);
                System.out.println(Thread.currentThread().getName() + "接收消息:" + receive);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        },"B").start();
    }
}

// output
B接收消息:你好 我是AAA
A接收消息:你好 我是BBB

​ 如果两个线程有一个没有执行exchange方法,则会一致等待。如果担心特殊情况发生,避免一直等待,可使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长

5.扩展

CountDownLatch VS CyclicBarrier VS Semaphore

  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
    • CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;
    • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • 另外,CountDownLatch 是不可以重用的,而 CyclicBarrier 是可以重用的。
  • Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3