node.js - nestjs 中的 Rxjs - 可观察到的订阅错误

标签 node.js rxjs observable nestjs behaviorsubject

我是试用 rxjs 和 nestjs 的新手。我目前正在尝试完成的用例是出于教育目的。所以我想使用“fs”模块读取一个 json 文件(在文件为空或无法读取的情况下抛出一个可观察到的错误)。现在我通过异步读取文件创建一个可观察对象,在主题中设置观察者,然后在 Controller 中订阅主题。这是我在服务中的代码

@Injectable()
export class NewProviderService {
    private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
    // this is the variable that should be exposed. make the subject as private
    // this allows the service to be the sole propertier to modify the stream and
    // not the controller or components
    serviceSubject$:  Observable<HttpResponseModel[]>;

    private serviceErrorSubject: BehaviorSubject<any>;
    serviceErrorSubject$: Observable<any>;
    filePath: string;
    httpResponseObjectArray: HttpResponseModel[];
    constructor() {
        this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
        this.serviceSubject$ = this.serviceSubject.asObservable();

        this.serviceErrorSubject = new BehaviorSubject<any>(null);
        this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();

        this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
    }
     readFileFromJson() {
          return new Promise((resolve, reject) => {
            fs.exists(this.filePath.toString(), exists => {
                if (exists) {
                    fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
                        if (err) {
                            logger.info('error in reading file', err);
                            return reject('Error in reading the file' + err.message);
                        }

                        logger.info('file read without parsing fg', data.length);
                        if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
                            // this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
                            // logger.info('array obj is:', this.httpResponseObjectArray);
                            logger.info('file read after parsing new', JSON.parse(data));
                            return resolve(JSON.parse(data).HttpTestResponse);
                        } else {
                            return reject(new FileExceptionHandler('no data in file'));
                        }
                    });
                } else {
                    return reject(new FileExceptionHandler('file cannot be read at the moment'));
                }
            });
          });
    }


        getData() {
                 from(this.readFileFromJson()).pipe(map(data => {
                        logger.info('data in obs', data);
                        this.httpResponseObjectArray = data as HttpResponseModel[];
                        return this.httpResponseObjectArray;
                 }), catchError(error => {
                     return Observable.throw(error);
                 }))
                 .subscribe(actualData => {
                    this.serviceSubject.next(actualData);
                }, err => {
                    logger.info('err in sub', typeof err, err);
                    this.serviceErrorSubject.next(err);
                });
            }

现在这是 Controller 类

@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {

  await this.newService.getData();
  this.newService.serviceSubject$.subscribe(data => {
    logger.info('data subscribed', data, _.isEmpty(data));
    if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
      logger.info('coming in');
      res.status(HttpStatus.OK).send(data);
      res.end();
    }
  });
}

我面临的问题是我第一次可以获得文件详细信息并且订阅被调用一次 > 它工作正常。关于后续的请求

Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
    at ServerResponse.setHeader (_http_outgoing.js:470:11)
    at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
    at Ser

端点/getJsonData 导致错误。有人可以帮帮我吗?我相信在第一次通话后订阅没有正常进行,但不确定如何结束以及如何解决

最佳答案

问题是您在 Controller 中订阅了 serviceSubject。每次发出新值时,它都会尝试发送响应。这第一次有效,但第二次它会告诉你它不能再次发送相同的响应;该请求已被处理。

您可以使用可管道化的first() 运算符在第一个值之后完成 Observable:

@Get('/getJsonData')
public async getJsonData() {
  await this.newService.getData();
  return this.newService.serviceSubject$.pipe(first())
}

您希望共享(热)您的Observable,以便每个订阅者始终获得相同的最新值。这正是 BehaviourSubject 所做的。因此,当您公开公开它时,您不应将 Subject 转换为 Observable,因为您将失去这种期望的行为。相反,您可以将 Subject 转换为 Observable,这样在内部它仍然是一个主题,但不会公开 next() 方法公开发布新值:

private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
   return this.serviceSubject;
}

关于node.js - nestjs 中的 Rxjs - 可观察到的订阅错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54828220/

相关文章:

javascript - 可观察事件限制/在内存中只保留一个事件/背压

javascript - 在每个新连接上使用 rxjs webSocket 发送消息

Angular 2 Observable.forkJoin this._subscribe 不是 [null] 中的函数

javascript - JWT 无法与 Express 路由器一起使用

javascript - 如何在路由器文件中获取应用程序(在 Node/Express 应用程序中)

node.js - jxcore包装的用法?

javascript - 使用来自多个异步调用的值,然后执行计算以推送值

SwiftUI 可观察对象未观察到

Angular2 Observable 绑定(bind)不正确

arrays - 在登录到控制台之前等待循环完成