共享资源
1. 不安全场景
package com.nike.erick.d05;import lombok.Getter;import java.util.concurrent.TimeUnit;public class Demo01 {public static void main(String[] args) throws InterruptedException {BankService bankService = new BankService();for (int i = 0; i < 15; i++) {new Thread(() -> bankService.drawMoney()).start();}TimeUnit.SECONDS.sleep(3);System.out.println(bankService.getTotalMoney());}
}class BankService {@Getterprivate int totalMoney = 10;public void drawMoney() {if (totalMoney > 0) {try {/*模仿业务事件*/TimeUnit.MILLISECONDS.sleep(100);totalMoney--;} catch (InterruptedException e) {e.printStackTrace();}} else {System.out.println("余额不足");return;}}
}
2. 解决方案
2.1 悲观锁-synchronized
2.2 乐观锁-CAS
不加锁实现共享资源的保护 Compare And Set JDK提供了对应的CAS类来实现不加锁
AtomicIntegerprivate volatile int value;# 1. 获取最新值
public final int get();# 2. 比较,交换
public final boolean compareAndSet(int expectedValue, int newValue)
package com.nike.erick.d05;import lombok.Getter;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class Demo02 {public static void main(String[] args) throws InterruptedException {Bank bank = new Bank();for (int i = 0; i < 1000; i++) {new Thread(() -> bank.withDrawMoney()).start();}TimeUnit.SECONDS.sleep(5);System.out.println(bank.getTotalAmount().get());}
}class Bank {/*JDK提供的cas类*/@Getterprivate AtomicInteger totalAmount = new AtomicInteger(100);public void withDrawMoney() {while (true) {/*最新值*/int before = totalAmount.get();if (before<=0){break;}/*想修改的值*/int next = before - 1;boolean isChanged = totalAmount.compareAndSet(before, next);if (isChanged) {break;}}}
}
3. CAS原理
- cas 必须和 volatile结合使用
- get()方法获取到的是类的value,被volatile修饰,其他线程修改该变量后,会立刻同步到主存中,方便其他线程的cas操作
- compareAndSet内部,是通过系统的 lock cmpxchg(x86架构)实现的,也是一种锁
4. CAS效率
无锁时,即使重试失败,线程一直高速运行。synchronized会让线程在没有锁的时候,发生上下文切换,进入阻塞,影响性能 无锁时,线程一直在运行,如果cpu不够多且当前时间片用完,虽然不会进入阻塞,但依然会发生上下文切换,从而进入可运行状态 无锁实现: 最好是线程数少于cpu的核心数目
CAS:
1. 无锁并发,无阻塞并发
2. 如果竞争激烈,重试机制必然频发触发,反而性能会收到影响
3. 基于乐观锁的思想
原子JDK
1.原子整数
# 功能类似
java.util.concurrent.atomic.AtomicInteger
java.util.concurrent.atomic.AtomicLong
java.util.concurrent.atomic.AtomicBoolean
1.1 常用方法
AtomicInteger的下面方法,都是原子性的,利用了CAS思想,简化代码
package com.erick.multithread.d4;import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;public class Demo06 {
}class Account {// 无参构造为0private AtomicInteger account = new AtomicInteger(3);public void method01() {/*自增并返回最新结果*/int result = account.incrementAndGet();System.out.println(result);}public void method02() {/*获取最新结果并自增*/int result = account.getAndDecrement();System.out.println(result);}public void method03() {/*自减并返回最新结果*/int result = account.decrementAndGet();System.out.println(result);}public void method04() {/*返回最新结果并自减*/int result = account.getAndDecrement();System.out.println(result);}public void method05() {/*先处理并返回结果: 函数接口: IntUnaryOperator*/int result = account.updateAndGet(new IntUnaryOperator() {@Overridepublic int applyAsInt(int operand) {return operand * 10;}});System.out.println(result);}public void method06() {/*返回结果并更新*/int result = account.getAndUpdate(new IntUnaryOperator() {@Overridepublic int applyAsInt(int operand) {return operand * 10;}});System.out.println(result);}public void method07() {/*如果需要减,则传递负数即可*/int result = account.addAndGet(5);System.out.println(result);}public void method08() {int result = account.getAndAdd(5);System.out.println(result);}
}
1.2 CAS代码简化
package com.erick.multithread.d4;import lombok.Getter;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class Demo07 {private static AccountService accountService = new AccountService();public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 1000; i++) {new Thread(() -> accountService.withDraw()).start();}TimeUnit.SECONDS.sleep(5);System.out.println(accountService.getAccount().get());}
}class AccountService {@Getterprivate AtomicInteger account = new AtomicInteger(100);public void withDraw() {if (account.get() <= 0) {return;}account.decrementAndGet();}public void withDraw(int num) {if (account.get() <= 0) {return;}account.addAndGet(num);}
}
1.3 CAS模拟
class BankService {@Getterprivate AtomicInteger leftMoney = new AtomicInteger(100);public void drawMoney() {sleepMills(1);detailWays(leftMoney, new IntUnaryOperator() {@Overridepublic int applyAsInt(int operand) {return operand - 10;}});}private void detailWays(AtomicInteger value, IntUnaryOperator operator) {while (true) {int before = value.get();/*** 1. IntUnaryOperator int applyAsInt(int operand);* 2. 函数接口,自定义实现*/int after = operator.applyAsInt(before);if (leftMoney.compareAndSet(before, after)) {break;}}}private void sleepMills(int mills) {try {TimeUnit.MILLISECONDS.sleep(mills);} catch (InterruptedException e) {e.printStackTrace();}}
}
2. 原子引用
一些其他计算场景,比如大数BigDecimal, 就要用到原子引用 用法和上面原子整数类似
2.1 AtomicReference
package com.erick.multithread.d4;import lombok.Getter;import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;public class Demo08 {public static void main(String[] args) throws InterruptedException {AccountService01 accountService = new AccountService01();for (int i = 0; i < 1000; i++) {new Thread(() -> accountService.withDraw(new BigDecimal("10"))).start();}TimeUnit.SECONDS.sleep(3);System.out.println(accountService.getBalance().get());}
}class AccountService01 {@Getterprivate AtomicReference balance = new AtomicReference<>(new BigDecimal("1000"));public void withDraw(BigDecimal count) {while (true) {if (balance.get().compareTo(new BigDecimal("0")) <= 0) {break;}BigDecimal previous = balance.get();BigDecimal next = previous.subtract(count);boolean isChanged = balance.compareAndSet(previous, next);if (isChanged) {break;}}}
}
2.2 AtomicStampedReference
ABA场景
线程-1,在CAS的过程中,线程2将原来的值从A改到B,然后又改回到A 线程-1的CAS交换会成功,但是对比的值,其实已经被改过 一般情况下,ABA并不会影响具体的业务
package com.erick.multithread.d4;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;public class Demo09 {private static AtomicReference result = new AtomicReference<>("A");public static void main(String[] args) throws InterruptedException {String previous = result.get();String next = "C";// 其他线程将该值从A->B 再从B->Amethod();TimeUnit.SECONDS.sleep(4);boolean isChanged = result.compareAndSet(previous, next);System.out.println("isChanged: " + isChanged + " result: " + result.get());}private static void method() throws InterruptedException {new Thread(() -> {while (true) {String previous = result.get();String next = "B";if (result.compareAndSet(previous, next)) {System.out.println("A->B交换成功");break;}}}).start();TimeUnit.SECONDS.sleep(1);new Thread(() -> {while (true) {String previous = result.get();String next = "A";if (result.compareAndSet(previous, next)) {System.out.println("B->A交换成功");break;}}}).start();}
}
ABA解决
具体的值和版本号 只要其他线程动过了共享变量(通过值和版本号),就算cas失败 可以通过版本号,得到该值前前后后被改动了多少次
package com.erick.multithread.d5;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;public class Demo01 {private static AtomicStampedReference ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {String previous = ref.getReference();int stamp = ref.getStamp();method();TimeUnit.SECONDS.sleep(4);String next = "C";boolean isChanged = ref.compareAndSet(previous, next, stamp, stamp + 1);System.out.println("isChanged: " + isChanged);}private static void method() throws InterruptedException {new Thread(() -> {String previous = ref.getReference();int stamp = ref.getStamp();String next = "B";boolean isChanged = ref.compareAndSet(previous, next, stamp, stamp + 1);System.out.println("A->B: " + isChanged);}).start();TimeUnit.SECONDS.sleep(1);new Thread(() -> {String previous = ref.getReference();int stamp = ref.getStamp();String next = "A";boolean isChanged = ref.compareAndSet(previous, next, stamp, stamp + 1);System.out.println("B->A: " + isChanged);}).start();}
}
2.3 AtomicMarkableReference
线程a在执行CAS操作时,其他线程反复修改数据,但是a线程只关心最终的结果是否变化了
package com.erick.multithread.d5;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;public class Demo02 {private static AtomicMarkableReference result = new AtomicMarkableReference<>("A", true);public static void main(String[] args) throws InterruptedException {String previous = result.getReference();String next = "B";boolean isMarked = result.isMarked();method();TimeUnit.SECONDS.sleep(3);boolean isChanged = result.compareAndSet(previous, next, isMarked, !isMarked);System.out.println("isChanged: " + isChanged);}private static void method() throws InterruptedException {new Thread(() -> {String previous = result.getReference();String next = "B";boolean isChanged = result.compareAndSet(previous, next, result.isMarked(), !result.isMarked());System.out.println("A->B: " + isChanged);}).start();TimeUnit.SECONDS.sleep(1);new Thread(() -> {String previous = result.getReference();String next = "A";boolean isChanged = result.compareAndSet(previous, next, result.isMarked(), !result.isMarked());System.out.println("B->A: " + isChanged);}).start();}
}
3. 原子数组
上述原子整数和原子引用,只是针对一个对象的 原子数组,可以存放上面的数组
3.1 不安全
package com.erick.multithread.d5;import java.util.concurrent.TimeUnit;public class Demo03 {private static int[] arr = new int[1];public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 100; i++) {new Thread(() -> {for (int j = 0; j < 10000; j++) {arr[0]++;}}).start();}TimeUnit.SECONDS.sleep(5);System.out.println(arr[0]);}
}
3.2 线程安全
AtomicIntegerArray AtomicLongArray AtomicReferenceArray
package com.erick.multithread.d5;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerArray;public class Demo04 {/*数组长度为1的一个数组*/private static AtomicIntegerArray array = new AtomicIntegerArray(1);public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 100; i++) {new Thread(() -> {for (int j = 0; j < 10000; j++) {/*参数一: 索引, 参数二:增加的值*/array.addAndGet(0, 1);}}).start();}TimeUnit.SECONDS.sleep(5);System.out.println(array.get(0));}
}
4. 原子更新器
用来原子更新对象中的字段,该字段必须和volatile结合使用
- AtomicReferenceFieldUpdater # 引用类型字段
- AtomicLongFieldUpdater # Long类型字段
- AtomicIntegerFieldUpdater # Integer类型字段
package com.erick.multithread.d5;import lombok.Data;import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;public class Demo05 {public static void main(String[] args) {Student student = new Student();AtomicReferenceFieldUpdater address =AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "address");boolean isChanged = address.compareAndSet(student, null, "lucy");System.out.println("isChanged: " + isChanged);System.out.println(student.address);}}@Data
class Student {public volatile String address;
}
5. 原子累加器
JDK 8 以后提供了专门的做累加的类,用来提高性能
# 原理: 在有竞争的时候,设置多个累加单元, Thread-0 累加 Cell[0], Thread-1累加Cell[1]
# 累加结束后,将结果进行汇总,这样他们在累加时操作的不同的Cell变量,因此减少了CAS重试失败,从而提高性能
- LongAdder ---- AtomicLong
package com.erick.multithread.d5;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;public class Demo06 {private static AtomicLong first = new AtomicLong(0);private static LongAdder second = new LongAdder();public static void main(String[] args) throws InterruptedException {method01();method02();}private static void method01() throws InterruptedException {/*p循环主要目的是JIT编译器优化*/for (int p = 0; p < 4; p++) {long start = System.currentTimeMillis();for (int i = 0; i < 100; i++) {new Thread(() -> {for (int j = 0; j < 100000; j++) {first.incrementAndGet();}}).start();}TimeUnit.SECONDS.sleep(2);System.out.println("AtomicLong:" + (System.currentTimeMillis() - start));}}private static void method02() throws InterruptedException {for (int p = 0; p < 4; p++) {long start = System.currentTimeMillis();for (int i = 0; i < 100; i++) {new Thread(() -> {for (int j = 0; j < 100000; j++) {second.increment();}}).start();}TimeUnit.SECONDS.sleep(2);System.out.println("LongAdder:" + (System.currentTimeMillis() - start));}}
}
6. Unsafe类
6.1 源码获取
package sun.miscpublic final class Unsafe {static {Reflection.registerMethodsToFilter(Unsafe.class, Set.of("getUnsafe"));}private Unsafe() {}private static final Unsafe theUnsafe = new Unsafe();private static final jdk.internal.misc.Unsafe theInternalUnsafe = jdk.internal.misc.Unsafe.getUnsafe();
package com.erick.multithread.d5;import sun.misc.Unsafe;import java.lang.reflect.Field;public class Demo07 {public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);Unsafe unsafe = (Unsafe) theUnsafe.get(null);System.out.println(unsafe);}
}
6.2 修改属性
package com.erick.multithread.d5;import lombok.Data;
import sun.misc.Unsafe;import java.lang.reflect.Field;public class Demo07 {public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);Unsafe unsafe = (Unsafe) theUnsafe.get(null);Teacher teacher = new Teacher();/*获取field的偏移量*/long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));/*执行cas操作*/boolean isIdChanged = unsafe.compareAndSwapInt(teacher, idOffset, 0, 1);boolean isNameChanged = unsafe.compareAndSwapObject(teacher, nameOffset, null, "erick");/*验证*/System.out.println(teacher);System.out.println(isIdChanged);System.out.println(isNameChanged);}
}@Data
class Teacher {volatile int id;volatile String name;
}
不可变类
1. 日期类
1.1 SimpleDateFormat
线程不安全
package com.dreamer.multithread.day07;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class Demo05 {public static void main(String[] args) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {try {Date parse = sdf.parse("2021-09-17");System.out.println(parse); // 最终解析结果不一样} catch (ParseException e) {e.printStackTrace();}}).start();}}
}
加锁解决
package com.dreamer.multithread.day07;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class Demo05 {public static void main(String[] args) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {synchronized (sdf) {try {Date parse = sdf.parse("2021-09-17");System.out.println(parse);} catch (ParseException e) {e.printStackTrace();}}}).start();}}
}
1.2. DateTimeFormatter
package com.dreamer.multithread.day07;import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;public class Demo05 {public static void main(String[] args) {DateTimeFormatter dfm = DateTimeFormatter.ofPattern("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {TemporalAccessor parse = dfm.parse("2021-09-17");System.out.println(parse);}).start();}}
}
2.不可变类
不可变类是线程安全的 类中所有成员变量都是final修饰,保证不可变,保证只能读不能写 类是final修饰,不会因为错误的继承来重写方法,导致了可变
# String类型: 不可变类
- 里面所有的Field都是final修饰的,保证了不可变,不可被修改: private final byte[] value;
- 类被final修饰,保证了String类不会被继承
# 数组保护性拷贝
- 数组类型也是final修饰,如果通过构造传递,实际上是创建了新的数组和对应的String [保护性拷贝]