在多线程编程里,放多线程会交叉访问共享的对象,如果我们不做些同步的工作,那些结果可能不是我们想要的。
var sum = 0@Testfun addition_isCorrect(){for(i in 0..100){Thread{accumulate(1)}.start()}Thread.sleep(3000)println(sum)}fun accumulate(i: Int){sum += i}
上面的例子是多个线程去操作sum这个共享变量,每个线程都是让这个sum变加1,那么期待的结果应该是101,但是上面的程序可能不会让你得到101,结果可能是100,99,98等这些错误的结果。
再比如下面这个协程的例子
@Testfun hello() = runBlocking {var coroutines = listOf()var shareSum = 0// 使用固定大小的线程池创建协程的执行上下文// @DelicateCoroutinesApi//public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {// require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }// val threadNo = AtomicInteger()// val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->// val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())// t.isDaemon = true// t// }// return executor.asCoroutineDispatcher()//}val scope = CoroutineScope(newFixedThreadPoolContext(8, "sizeFixedThreadPool"))// 在不阻塞当前线程的情况下启动一个新的协程,并将对该协程的引用作为Job返回。可以用job取消当前的协程val job = scope.launch {// 提醒 scope这个范围里有8条线程在执行coroutines = 1.rangeTo(100).map {// 创建100个协程launch {for(i in 1..100){shareSum += 1}}}// 我们等待所以协程执行完成coroutines.forEach {// join() 会挂起协程,直到该作业完成it.join()}}.join()println(" 10000, $shareSum") // 10000, 9952}
我创建一个有8个线程的协程执行上下文,然后在此执行上下文中创建一个100个协程,每个协程对shareSum进行加1操作。结果是有时正确,有时不正确。
上面两个例子为什么出现这些错误?这些线程的工作过程是这样的:
在多线程的世界,可能会出现以下的情况:
还有很多类似的情况都会导致多线程操作共享变量出问题。为了解决这些问题,我们只要同步这些线程的工作就可以了。目的就是保证它这些线程操作shareSum时,一定是拿到最新的值去加1。
首先,volatile关键字并不能够解决上面遇到的问题,但是也顺便分享给大家。在Java中是volatile,在kotlin中是用@Volatile
,这个东西是用在字段上。它的作用是提供内存可见性,保证这个正在被读取的字段的值一定是来自内存,而不是CPU的cache(就是CPU的高速缓存)。所以加了这个关键字的字段,CPU在读取时,它直接忽略在cache的值,直接重新从内存读取这个字段的值。这样就保证了CPU一定读取到这个字段最新的值。
我们上面的问题呢,有一程情况是多个线程都读取了相同的值造成了不正确的结果。volatile这个技术帮不了忙。它对单线程是有效的。这里就不展开了。
要解决上面的问题,就要保存它的操作是原子性,也就是每个线程获取了shareSum的值,将其保存到临时变量并加1,再保存回shareSum这些操作完成了,下一个线程才能开始操作。这样保证每个线程的操作都是原子性后,那么结果是正确的。
对一个例子的修复:
@Synchronizedfun accumulate(i: Int){sum += i}
对第二个例子的修复:
@Testfun hello() = runBlocking {var coroutines = listOf()// 使用固定大小的线程池创建协程的执行上下文// @DelicateCoroutinesApi//public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {// require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }// val threadNo = AtomicInteger()// val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->// val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())// t.isDaemon = true// t// }// return executor.asCoroutineDispatcher()//}val scope = CoroutineScope(newFixedThreadPoolContext(8, "sizeFixedThreadPool"))// 在不阻塞当前线程的情况下启动一个新的协程,并将对该协程的引用作为Job返回。可以用job取消当前的协程val job = scope.launch {// 提醒 scope这个范围里有8条线程在执行coroutines = 1.rangeTo(100).map {// 创建100个协程launch {for(i in 1..100){accumulateShareSum()}}}// 我们等待所以协程执行完成coroutines.forEach {// join() 会挂起协程,直到该作业完成it.join()}}.join()println(" 10000, $shareSum") // 10000, 9952}var shareSum = 0@Synchronizedfun accumulateShareSum(){shareSum += 1}
每次都结果都是正确的。感觉很棒!Synchronized保证每个线程完成了操作后,另一个线程才可以入场操作。这里提一个,我们的Synchronized是加在方法上的,因此整个方法的访问都被限制在一个线程。请看下面这种同步方法:
@Synchronized
fun accumulateShareSum(flag: Boolean){if(flag) {shareSum += 1}
}
上面这个方法,对任何调用者,不管它们是否需要加1操作都进行了同步,这其实不是很好,有一种同步语句比较适合,它会让真正需要操作共享变量的调用同步:
@Synchronizedfun accumulateShareSum(flag: Boolean){if(flag) {synchronized(this){shareSum += 1}}}
其实,大家可能已经很熟悉了,比如AtomicInteger
,AtomicReference
,AtomicBoolean
等都是常见的原子原语,它们提供了很多方法供开发者使用,都能达到原子性操作,保证线程安全。比如上面的例子:
var shareSum = AtomicInteger(0)fun accumulateShareSum(){shareSum.incrementAndGet()}
锁比Synchronized的同步方法和同步语要灵活。它可以出现在任何地方。我们现在用重入锁解决上面的问题:
val reentrantLock = ReentrantLock()var shareSum = 0fun accumulateShareSum(){reentrantLock.lock()try {shareSum += 1} finally {reentrantLock.unlock()}}
我们直接上代码吧:
val semaphore = Semaphore(1)var shareSum = 0fun accumulateShareSum(){try {semaphore.acquire()shareSum += 1} finally {semaphore.release()}}
另外Java还提供了很多并发的工具和集合(如HashTable,ConcurrentHashMap)等。大家有空可以去了解。CyclicBarrier和CountDownLatch是一些同步的工具,大家另外脑补吧。