Kotlin线程同步的几种实现方法

目录
  • 1. Thread.join()
  • 2. Synchronized
  • 3. ReentrantLock
  • 4. BlockingQueue
  • 5. CountDownLatch
  • 6. CyclicBarrier
  • 7. CAS
  • 8. Future
  • 9. CompletableFuture
  • 10. RxJava
  • 11. Coroutine
  • 12. Flow
  • 总结

面试的时候经常会被问及多线程同步的问题,例如:
“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”
在 Kotlin 中我们有多种实现方式,本文将所有这些方式做了整理,建议收藏。
1. Thread.join
2. Synchronized
3. ReentrantLock
4. BlockingQueue
5. CountDownLatch
6. CyclicBarrier
7. CAS
8. Future
9. CompletableFuture
10. Rxjava
11. Coroutine
12. Flow

我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}

val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}

val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Thread 的 join() :

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String

    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()

    t1.join()
    t2.join()

    task3(s1, s2)

}

2. Synchronized

使用 synchronized 锁进行同步

 @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }
    }

但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized。

3. ReentrantLock

ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用

 @Test
    fun test_ReentrantLock() {

        lateinit var s1: String
        lateinit var s2: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()
    }

ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务,

4. BlockingQueue

阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果

 @Test
    fun test_blockingQueue() {

        lateinit var s1: String
        lateinit var s2: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        s2 = task2()

        queue.take()
        task3(s1, s2)
    }

当然,阻塞队列更多是使用在生产/消费场景中的同步。

5. CountDownLatch

JUC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:

 @Test
    fun test_countdownlatch() {

        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()

        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()

        cd.await()
        task3(s1, s2)
    }

共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松

6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。
与 CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用

 @Test
    fun test_CyclicBarrier() {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()
        task3(s1, s2)
    }

7. CAS

AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。
因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。

  @Test
    fun test_cas() {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)
    }

while 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。

volatile

看到 CAS 的无锁实现,也许很多人会想到 volatile, 是否也能实现无锁的线程安全?

  @Test
    fun test_Volatile() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            s1 = task1()
            cnt--
        }.start()

        Thread {
            s2 = task2()
            cnt--
        }.start()

        while (cnt != 0) {
        }

        task3(s1, s2)
    }

注意,这种写法是错误的
volatile 能保证可见性,但是不能保证原子性,cnt-- 并非线程安全,需要加锁操作

8. Future

上面无论有锁操作还是无锁操作,都需要定义两个变量s1、s2记录结果非常不方便。
Java 1.5 开始,提供了 Callable 和 Future ,可以在任务执行结束时返回结果。

@Test
fun test_future() {

    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))

    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)

    task3(future1.get(), future2.get())

}

通过 future.get(),可以同步等待结果返回,写起来非常方便

9. CompletableFuture

future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture  ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:

@Test
fun test_CompletableFuture() {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}

10. RxJava

RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求:
zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务

@Test
fun test_Rxjava() {

    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()

}

11. Coroutine

前面讲了那么多,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:

@Test
fun test_coroutine() {

    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }

        val c2 = async(Dispatchers.IO) {
            task2()
        }

        task3(c1.await(), c2.await())
    }
}

写起来特别舒服,可以说是集前面各类工具的优点于一身。

12. Flow

Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:

@Test
fun test_flow() {

    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }

    runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()
    }
}

flowOn 使得 Task 在异步计算并发射结果。

总结

上面这么多方式,就像茴香豆的“茴”字的四种写法,没必要都掌握。作为结论,在 Kotlin 上最好用的线程同步方案首推协程!

到此这篇关于Kotlin线程同步的几种实现方法的文章就介绍到这了,更多相关Kotlin线程同步内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2021-07-07

深入理解 Java、Kotlin、Go 的线程和协程

前言 Go 语言比 Java 语言性能优越的一个原因,就是轻量级线程Goroutines(协程Coroutine).本篇文章深入分析下 Java 的线程和 Go 的协程. 协程是什么 协程并不是 Go 提出来的新概念,其他的一些编程语言,例如:Go.Python 等都可以在语言层面上实现协程,甚至是 Java,也可以通过使用扩展库来间接地支持协程. 当在网上搜索协程时,我们会看到: Kotlin 官方文档说「本质上,协程是轻量级的线程」. 很多博客提到「不需要从用户态切换到内核态」.「是协作式的

详细介绍 进程、线程和协程的区别

详解 进程.线程和协程的区别 首先,给出"进程.线程和协程"的特点: 进程:拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度: 线程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度: 协程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度. 接下来,以一个形象的例子,进一步讲述"进程.线程和协程"三者之间的区别: 假设有一个单核的操作系统,系统上没有其它的程序需要运行,现有两个线程 A 和 B,A

python简单线程和协程学习心得(分享)

python中对线程的支持的确不够,不过据说python有足够完备的异步网络框架模块,希望日后能学习到,这里就简单的对python中的线程做个总结 threading库可用来在单独的线程中执行任意的python可调用对象.尽管此模块对线程相关操作的支持不够,但是我们还是能够用简单的线程来处理I/O操作,以减低程序响应时间. from threading import Thread import time def countdown(n): while n > 0: print('T-minus:

简述Python中的进程、线程、协程

进程.线程和协程之间的关系和区别也困扰我一阵子了,最近有一些心得,写一下. 进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度. 线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的). 协程和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度. 进程和其他两个的区别还是很明显的. 协程和线程的区别是:协程避免了无意义的调度,由此可以提高性能,但也因此,程序员必须自己承担调度的责任,同时,协程也失去了标准线程使用多CPU的能力. Pyt

关于PHP中协程和阻塞的一些理解与思考

前言 本文主要给大家介绍了关于PHP中协程和阻塞的理解与思考,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: 进程.线程.协程 关于进程.线程.协程,有非常详细和丰富的博客或者学习资源,我不在此做赘述,我大致在此介绍一下这几个东西. 进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度. 线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的). 协程和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度. PHP中的协程实现

老生常谈进程线程协程那些事儿

一.进程与线程 1.进程 我们电脑的应用程序,都是进程,假设我们用的电脑是单核的,cpu同时只能执行一个进程.当程序出于I/O阻塞的时候,CPU如果和程序一起等待,那就太浪费了,cpu会去执行其他的程序,此时就涉及到切换,切换前要保存上一个程序运行的状态,才能恢复,所以就需要有个东西来记录这个东西,就可以引出进程的概念了. 进程就是一个程序在一个数据集上的一次动态执行过程.进程由程序,数据集,进程控制块三部分组成.程序用来描述进程哪些功能以及如何完成:数据集是程序执行过程中所使用的资源:进程控制

使用kotlin协程提高app性能(译)

协程是一种并发设计模式,您可以在Android上使用它来简化异步执行的代码.Kotlin1.3版本添加了 Coroutines,并基于其他语言的既定概念. 在Android上,协程有助于解决两个主要问题: 管理长时间运行的任务,否则可能会阻止主线程并导致应用冻结. 提供主安全性,或从主线程安全地调用网络或磁盘操作. 本主题描述了如何使用Kotlin协程解决这些问题,使您能够编写更清晰,更简洁的应用程序代码. 管理长时间运行的任务 在Android上,每个应用程序都有一个主线程来处理用户界面并管理

深入理解JAVA多线程之线程间的通信方式

一,介绍 本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码. 二,线程间的通信方式 ①同步 这里讲的同步是指多个线程通过synchronized关键字这种方式来实现线程间的通信. 参考示例: public class MyObject { synchronized public void methodA() { //do something.... } synchronized public void methodB()

深入理解Java编程线程池的实现原理

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

深入理解Java 线程池

线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的.在jdk1.5之后这一情况有了很大的改观.Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用.为我们在开发中处理线程的问题提供了非常大的帮助. 线程池的作用: 线程池作用就是限制系统中执行线程的数量.      根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少了浪费了系统资源,多了造成系统拥挤效率不高.用

深入理解Java 线程通信

当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但 Java 也提供了一些机制来保证线程协调运行. 传统的线程通信 假设现在系统中有两个线程,这两个线程分别代表存款者和取钱者--现在假设系统有一种特殊的要求,系统要求存款者和取钱者不断地重复存款.取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱.不允许存款者连续两次存钱,也不允许取钱者连续两次取钱. 为了实现这种功能,可以借助于 Object 类提供的 wait(). notify()