import 'dart:isolate'; import 'dart:async'; import 'dart:typed_data'; import 'dart:ffi'; import 'package:flowy_log/flowy_log.dart'; import 'protobuf/flowy-dart-notify/subject.pb.dart'; typedef ObserverCallback = void Function(ObservableSubject observable); class RustStreamReceiver { static RustStreamReceiver shared = RustStreamReceiver._internal(); late RawReceivePort _ffiPort; late StreamController _streamController; late StreamController _observableController; late StreamSubscription _ffiSubscription; int get port => _ffiPort.sendPort.nativePort; StreamController get observable => _observableController; RustStreamReceiver._internal() { _ffiPort = RawReceivePort(); _streamController = StreamController(); _observableController = StreamController.broadcast(); _ffiPort.handler = _streamController.add; _ffiSubscription = _streamController.stream.listen(streamCallback); } factory RustStreamReceiver() { return shared; } static listen(void Function(ObservableSubject subject) callback) { RustStreamReceiver.shared.observable.stream.listen(callback); } void streamCallback(Uint8List bytes) { try { final observable = ObservableSubject.fromBuffer(bytes); _observableController.add(observable); } catch (e, s) { Log.error( 'RustStreamReceiver ObservableSubject deserialize error: ${e.runtimeType}'); Log.error('Stack trace \n $s'); rethrow; } } Future dispose() async { await _ffiSubscription.cancel(); await _streamController.close(); await _observableController.close(); _ffiPort.close(); } }