StreamProvider 和 TCP 套接字

问题描述

我编写了一个单页桌面应用程序来与 TCP 服务器通信。

在我的代码中,我使用 Socket.listen() 方法接收数据,没问题。

  • 我使用的是单一订阅,对我来说已经足够了。

我尝试将其转换为 StreamProvider[Riverpod] 但失败了。

  • 我使用了 StreamController() 然后我的状态变差了。
  • 我使用了 StreamController.broadcast() 并且无法从套接获取数据

你能给我建议正确的方法吗?
附带说明:我不是经验丰富的 Flutter 开发人员,只是尝试学习 :)

我在下面添加代码块和完整代码

完整代码https://gist.github.com/sphinxlikee/3cbfa47817a5187c7b67905028674041

界面:

enter image description here

工作代码

  Future<void> createConnection() async {
    try {
      _socket = await Socket.connect(serverAddress,serverPort);
      _changeConnectionState();
    } catch (e) {
      print('connection has an error and socket is null.');
      print(e);
      return;
    }

    listenSocket();
  }

  void listenSocket() {
    _socket.listen(
      (event) {
        _getData(String.fromCharCodes(event));
        print('received: $receivedData');

        if (!_dataReceived) {
          _changeDataReceivedState();
        }
      },)
      ..onDone(
        () {
          _changeConnectionState();
          _streamDone();
          print('socket is closed');
        },)
      ..onError(
        (error,stackTrace) {
          print('$error');
        },);
  }

工作代码 - UI 端

class ReceivedData extends ConsumerWidget {
  @override
  Widget build(BuildContext context,ScopedReader watch) {
    final receivedData = watch(tcpClientProvider).receivedData;
    return Text('Received data: $receivedData');
  }
}

对于我尝试过的 StreamProvider,

 Future<void> createConnection() async {
    try {
      _socket = await Socket.connect(serverAddress,serverPort);
      streamController.sink.add(_socket.listen((event) => String.fromCharCodes(event)));
      _changeConnectionState();
    } catch (e) {
      print('connection has an error and socket is null.');
      print(e);
      return;
    }

  }

StreamProvider - UI 端

final streamProvider = StreamProvider.autodispose(
  (ref) async* {
    await for (final value in ref.watch(tcpClientProvider).streamController.stream) {
      yield value;
    }
  },);

class ReceivedDataWithProvider extends ConsumerWidget {
  @override
  Widget build(BuildContext context,ScopedReader watch) {
    AsyncValue receivedData = watch(streamProvider);

    return receivedData.when(
      data: (data) => Text('Received data: $data'),loading: () => const CircularProgressIndicator(),error: (err,stack) => Text('error'),);
  }
}

解决方法

Socket 实现了 Stream,所以你可以这样写:

final streamProvider = StreamProvider.autoDispose<Uint8List>((ref) {
  return ref.watch(tcpClientProvider)._socket;
});

如果你仍然想添加一个监听器,如果你需要的话,拥有一个也没有坏处:

final streamProvider = StreamProvider.autoDispose<Uint8List>((ref) {
  final client = ref.watch(tcpClientProvider);

  return client._socket
    ..listen(
      (event) {},).onDone(
      () {
        client
          .._changeConnectionState()
          .._streamDone();
        print('socket is closed');
      },);
});