dart - 使用 Providers 在流中流式传输

标签 dart stream flutter observer-pattern rxdart

因此,我创建了一个带有 Stream 的 BLOC 结构,如下所示。 Fetcher 将收到对聊天室 ID 列表的更改。然后使用转换器,它将流中的数据添加到缓存映射并将其通过管道传输到输出。

现在要注意的是每个聊天室 ID 都将用于创建流实例,因此订阅聊天室数据中的任何更改。因此,Cache 映射基本上将 Chatroom ID 映射到其对应的 Stream。 ChatRoomProvider 将 bloc 与应用程序绑定(bind)。

   class ChatRoomBloc {
    // this is similar to the Streambuilder and Itemsbuilder we have in the Stories bloc
      final _chatroomsFetcher = PublishSubject<String>();
      final _chatroomsOutput =
          BehaviorSubject<Map<String, Observable<ChatroomModel>>>();

// Getter to Stream
  Observable<Map<String, Observable<ChatroomModel>>> get chatroomStream =>
      _chatroomsOutput.stream;

  ChatRoomBloc() {
    chatRoomPath.listen((chatrooms) => chatrooms.documents
        .forEach((f) => _chatroomsFetcher.sink.add(f.documentID)));
    _chatroomsFetcher.stream
        .transform(_chatroomsTransformer())
        .pipe(_chatroomsOutput);
  }

  ScanStreamTransformer<String, Map<String, Observable<ChatroomModel>>>
      _chatroomsTransformer() {
    return ScanStreamTransformer(
        (Map<String, Observable<ChatroomModel>> cache, String id, index) {
      // adding the iteam to cache map
      cache[id] = chatRoomInfo(id);
      print('cache ${cache.toString()}');
      return cache;
    }, <String, Observable<ChatroomModel>>{});
  }

  dispose() {
    _chatroomsFetcher.close();
    _chatroomsOutput.close();
  }
}

Observable<ChatroomModel> chatRoomInfo(String _chatrooms) {
  final _chatroomInfo = PublishSubject<ChatroomModel>();

  Firestore.instance
      .collection('chatRooms')
      .document(_chatrooms)
      .snapshots()
      .listen((chatroomInfo) =>
          _chatroomInfo.sink.add(ChatroomModel.fromJson(chatroomInfo.data)));

  dispose() {
    _chatroomInfo.close();
  }

  return _chatroomInfo.stream;
}

然后我创建了一个带有 ListView 的 Streambuilder,以列出 ID 和来自其相应流的任何数据,如下所示。

class FeedList extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    final chatroomBloc = ChatRoomProvider.of(context);
    return Scaffold(
      appBar: AppBar(
        title: Text('Chat Room'),
      ),
      body: buildList(chatroomBloc),
    );
  }
  Widget buildList(ChatRoomBloc chatroomBloc) {
    return StreamBuilder(
        // Stream only top ids to display
        stream: chatroomBloc.chatroomStream,
        builder: (context,
            AsyncSnapshot<Map<String, Observable<ChatroomModel>>> snapshot) {
          if (!snapshot.hasData) { // no data yet
            return Center(child: CircularProgressIndicator());
          }
          return ListView.builder(
            itemCount: snapshot.data.length,
            itemBuilder: (context, int index) {
              print('index $index and ${snapshot.data}');
              return buildTile(snapshot.data[index]);
            },
          );
        });
  }

  Widget buildTile(Observable<ChatroomModel> chatroomInfoStream) {
    return StreamBuilder(
        stream: chatroomInfoStream,
        builder: (context, AsyncSnapshot<ChatroomModel> chatroomSnapshot) {
          if (!chatroomSnapshot.hasData) {
            return Center(
              child: CircularProgressIndicator(),
            );
          }
          print('${chatroomSnapshot.data.name}');
          print('${chatroomSnapshot.data.members.toString()}');
          return Column(children: [
            ListTile(
              title: Text('${chatroomSnapshot.data.name}'),
              trailing: Column(
                children: <Widget>[
                  Icon(Icons.comment),
                ],
              ),
            ),
            Divider(
              height: 8.0,
            ),
          ]);
        });
  }
}

我得到的输出如下所示。 Streambuilder 停留在 buildTile 方法中的 CircularProgressIndicator 处。我认为这意味着实例正在创建并添加到缓存映射中,但它们经常监听正确的实例,或者我连接流的方式有问题。你能帮忙吗?

I/flutter (12856): cache {H8j0EHhu2QpicgFDGXYZ: Instance of 'PublishSubject<ChatroomModel>'} 
I/flutter (12856): cache {H8j0EHhu2QpicgFDGXYZ: Instance of 'PublishSubject<ChatroomModel>', QAhKYk1cfoq8N8O6WY2N: Instance of 'PublishSubject<ChatroomModel>'} 
I/flutter (12856): index 0 and {H8j0EHhu2QpicgFDGXYZ: Instance of 'PublishSubject<ChatroomModel>', QAhKYk1cfoq8N8O6WY2N: Instance of 'PublishSubject<ChatroomModel>'} 
I/flutter (12856): index 1 and {H8j0EHhu2QpicgFDGXYZ: Instance of 'PublishSubject<ChatroomModel>', QAhKYk1cfoq8N8O6WY2N: Instance of 'PublishSubject<ChatroomModel>'}

最佳答案

作为快速修复,也许可以尝试:

final _chatroomInfo = BehaviorSubject<ChatroomModel>();

第二个注意事项:

当前状态下的代码难以阅读和理解,难以维护且效率低下。我不确定您实际上想做什么。

嵌套 StreamBuilder 是个坏主意。它会将聊天列表的显示延迟至少 2 帧,因为每个 StreamBuilder 至少渲染一个空帧 (data = null)。

监听流并将结果提供给 Subject 也会增加延迟。

如果可能,请尝试删除所有主题。相反,请使用 rx 运算符。

BLoC 应提供单个输出流,以提供呈现聊天列表所需的所有数据。

关于dart - 使用 Providers 在流中流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53915254/

相关文章:

flutter - 如何在 Flutter 中启用 Null-Safety?

machine-learning - 演进环境中在线集群的流规范化

firebase - flutter 问题 : type 'Timestamp' is not a subtype of type 'DateTime'

android - 在 Android Studio 中 Dart 。没有运行应用程序的设备

dart - 如何在BLoC中使用BottomNavigationBar?

dart - Future.then()执行得太早

flutter - 使用相机包授予权限会在出现异常时暂停调试器

Dart pub publish 在身份验证后给出错误

flutter - 当在Stream上使用Distinct()时似乎无法过滤出相同的结果

javascript - 如何将文件流式传输到服务器并同步流式传输给用户? (普通文件下载仿)