Dart 是单线程,那么怎么异步呢?或者耗时为什么不卡线程呢?
Dart 代码运行在单个执行线程中,Flutter 引擎并不是单线程
Flutter 引擎并不会创建线程,embedder提供给4个 task runner 引用给Flutter 引擎:
- Platform Task Runner
- UI Task Runner
- GPU Task Runner
- IO Task Runner
所有的 Dart 代码均运行在一个 isolate 的上下文环境中,该 isolate
中拥有对应 Dart 代码片段运行所需的所有内存。那么在开发中,我们经常会遇到一些耗时的操作,比如网络请求、文件读取等等,那么线程势必会阻塞,无法响应其他时间,UI 卡死,那么怎么在单线程处理耗时操作呢?
通常我们会使用一个 Future
对象用于表示异步操作的结果,这些正在处理的操作或 I/O 将会在稍后完成。那么 Future
是怎么实现单线程异步呢?
Event loop
事件触发,如点击、重绘,事件循环取到事件,处理,丢弃。就像快递送到收件人手里,被拆开、拿走快递、丢掉快递袋子。而 Future
修饰的函数,类似一个冷冻箱,放在快递人手里,并不拆开,而是别人解冻处理后,然后告诉收件人可以拆了,收件人再拆开。这期间,CPU 则去调度执行其他IO,等异步处理完成,这个结果会被放入事件循环,事件循环处理这个结果。
上面说到异步处理,许多文章都一笔带过,我们不免头大,并没有解惑,为什么单线程可以异步处理?下面从 Future
的建立说起:
Future
会创建 Timer
,并将 timer_impl.dart
中 _Timer
对象的静态方法 _handleMessage()
放入到 isolate_patch.dart
中 _RawReceivePortImpld
对象的静态成员变量 _handlerMap
;并创建 ReceivePort
和 SendPort
,这里就和 Android
线程间通信的 Hander
一样, Future
会把任务交给操作系统去执行,然后自己继续执行别的任务。比如说,网络请求,Socket 本身提供了 select 模型可以异步查询;而文件 IO,操作系统也提供了基于事件的回调机制。等事件处理完,再把结果发回 ReceivePort
,事件循环去处理这个结果。
那么别的负载大的耗时操作呢?比如通用的耗时计算任务,例如求解阶乘,os 并没有异步接口给 Dart 调用,所以异步编程帮助不大,这时候就需要多线程去处理了,而 Dart 的多线程就是 isolate
,但是 isolate
并不是内存共享的,它更像是一个进程。
isolate 运用
最简单的 compute
通常网络返回 json ,我们需要解析成 实体 bean ,如果 json 十分庞大,耗时较多,就卡顿了。所以需要放在 isolate
里处理。
import 'dart:convert'; main(List<String> args) { String jsonString = '''{ "id":"123", "name":"张三", "score" : 95}'''; Student student = parseJson(jsonString); print(student.name); } Student parseJson(String json) { Map<String, dynamic> map = jsonDecode(json); return Student.fromJson(map); } class Student { String id; String name; int score; Student({this.id, this.name, this.score}); factory Student.fromJson(Map parsedJson) { return Student(id: parsedJson['id'], name: parsedJson['name'], score: parsedJson['score']); } } 复制代码
我们把上面代码放入 isolate
中执行:
Future<Student> loadStudent(String json) { return compute(parseJson, json); } Student parseJson(String json) { Map<String, dynamic> map = jsonDecode(json); return Student.fromJson(map); } 复制代码
compute
是 Flutter 的 api ,帮我们封装了 isolate
,使用十分简单,但是也有局限性, 它没有办法多次返回结果,也没有办法持续性的传值计算,每次调用,相当于新建一个隔离,如果同时调用过多的话反而会多次开辟内存。在某些业务下,我们可以使用compute,但是在另外一些业务下,我们只能使用dart提供的 isolate
了。
单向通信 isolate
我们把上面的代码利用 isolate
实现一遍:
import 'dart:convert'; import 'dart:isolate'; main(List<String> args) async { await start(); } Isolate isolate; start() async { //创建接收端口,用来接收子线程消息 ReceivePort receivePort = ReceivePort(); //创建并发Isolate,并传入主线程发送端口 isolate = await Isolate.spawn(entryPoint, receivePort.sendPort); //监听子线程消息 receivePort.listen((data) { print('Data:$data'); }); } //并发Isolate entryPoint(SendPort sendPort) { String jsonString = '''{ "id":"123", "name":"张三", "score" : 95}'''; Student student = parseJson(jsonString); sendPort.send(student); } Student parseJson(String json) { Map<String, dynamic> map = jsonDecode(json); return Student.fromJson(map); } class Student { String id; String name; int score; Student({this.id, this.name, this.score}); factory Student.fromJson(Map parsedJson) { return Student(id: parsedJson['id'], name: parsedJson['name'], score: parsedJson['score']); } } 复制代码
有时候,我们需要传参给子线程,或者像线程池一样可以管理这个 isolate
,那么我们就需要实现双向通信:
import 'dart:isolate'; main(List<String> args) async { await start(); await Future.delayed(Duration(seconds: 1), () { threadPort.send('我来自主线程'); print('1'); }); await Future.delayed(Duration(seconds: 1), () { threadPort.send('我也来自主线程'); print('2'); }); await Future.delayed(Duration(seconds: 1), () { threadPort.send('end'); print('3'); }); } Isolate isolate; //子线程发送端口 SendPort threadPort; start() async { //创建主线程接收端口,用来接收子线程消息 ReceivePort receivePort = ReceivePort(); //创建并发Isolate,并传入主线程发送端口 isolate = await Isolate.spawn(entryPoint, receivePort.sendPort); //监听子线程消息 receivePort.listen((data) { print('主线程收到来自子线程的消息$data'); if (data is SendPort) { threadPort = data; } }); } //并发Isolate entryPoint(dynamic message) { //创建子线程接收端口,用来接收主线程消息 ReceivePort receivePort = ReceivePort(); SendPort sendPort; print('==entryPoint==$message'); if (message is SendPort) { sendPort = message; print('子线程开启'); sendPort.send(receivePort.sendPort); //监听子线程消息 receivePort.listen((data) { print('子线程收到来自主线程的消息$data'); assert(data is String); if (data == 'end') { isolate?.kill(); isolate = null; print('子线程结束'); return; } }); return; } } 复制代码
==entryPoint==SendPort 子线程开启 主线程收到来自子线程的消息SendPort 1 子线程收到来自主线程的消息我来自主线程 2 子线程收到来自主线程的消息我也来自主线程 3 子线程收到来自主线程的消息end 子线程结束 复制代码
双向通信比较复杂,所以我们需要封装下,通过 api 让外部调用:
import 'dart:async'; import 'dart:isolate'; main(List<String> args) async { var worker = Worker(); worker.reuqest('发送消息1').then((data) { print('子线程处理后的消息:$data'); }); Future.delayed(Duration(seconds: 2), () { worker.reuqest('发送消息2').then((data) { print('子线程处理后的消息:$data'); }); }); } class Worker { SendPort _sendPort; Isolate _isolate; final _isolateReady = Completer<void>(); final Map<Capability, Completer> _completers = {}; Worker() { init(); } void dispose() { _isolate.kill(); } Future reuqest(dynamic message) async { await _isolateReady.future; final completer = new Completer(); final requestId = new Capability(); _completers[requestId] = completer; _sendPort.send(new _Request(requestId, message)); return completer.future; } Future<void> init() async { final receivePort = ReceivePort(); final errorPort = ReceivePort(); errorPort.listen(print); receivePort.listen(_handleMessage); _isolate = await Isolate.spawn( _isolateEntry, receivePort.sendPort, onError: errorPort.sendPort, ); } void _handleMessage(message) { if (message is SendPort) { _sendPort = message; _isolateReady.complete(); return; } if (message is _Response) { final completer = _completers[message.requestId]; if (completer == null) { print("Invalid request ID received."); } else if (message.success) { completer.complete(message.message); } else { completer.completeError(message.message); } return; } throw UnimplementedError("Undefined behavior for message: $message"); } static void _isolateEntry(dynamic message) { SendPort sendPort; final receivePort = ReceivePort(); receivePort.listen((dynamic message) async { if (message is _Request) { print('子线程收到:${message.message}'); sendPort.send(_Response.ok(message.requestId, '处理后的消息')); return; } }); if (message is SendPort) { sendPort = message; sendPort.send(receivePort.sendPort); return; } } } class _Request { /// The ID of the request so the response may be associated to the request's future completer. final Capability requestId; /// The actual message of the request. final dynamic message; const _Request(this.requestId, this.message); } class _Response { /// The ID of the request this response is meant to. final Capability requestId; /// Indicates if the request succeeded. final bool success; /// If [success] is true, holds the response message. /// Otherwise, holds the error that occured. final dynamic message; const _Response.ok(this.requestId, this.message) : success = true; const _Response.error(this.requestId, this.message) : success = false; } 复制代码
CoLaBug.com遵循[CC BY-SA 4.0]分享并保持客观立场,本站不承担此类作品侵权行为的直接责任及连带责任。您有版权、意见、投诉等问题,请通过[eMail]联系我们处理,如需商业授权请联系原作者/原网站。