java - 在未读取所有元素时清理 Iterable

标签 java rx-java reactive-programming java-6

开始接触 RxJava。我有一个实现 Iterable 的类,我想将其转换为 Observable。使用 Observable.from() 似乎很容易。但是,我需要设置和拆除为我提供各个条目的代码(迭代器中的 next()

当我完成整个序列时,这很容易。我添加了对 hasNext() 函数的调用,当没有 next 时,我运行拆解。然而,我想使用的非常有前途的运算符之一是 take(someNumber)。如果在迭代器用完项目之前停止获取,则清理代码永远不会运行。

我该怎么做才能运行清理?如果使用 from(Iterable) 以外的东西,我可以接受。我现在停留在 Java6 上。为了说明我的困境,我创建了一个最小示例:

更新:根据不要将 Iterator 和 Iterable 混合在一起的反馈,我更新了下面的代码。要了解原始答案,original code is in that gist .

更新后的测试代码(仍然很糟糕):

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

/**
* @author stw
*
*/
public class RXTest {

/**
 * @param args
 */
public static void main(String[] args) {
  ComplicatedObject co = new ComplicatedObject();
  Observable<FancyObject> fancy = Observable.from(co);
  // if the take is less than the elements cleanup never
  // runs. If you take the take out, cleanup runs
  fancy.take(3).subscribe(
      new Action1<FancyObject>() {

        public void call(FancyObject item) {
            System.out.println(item.getName());
        }
    },
    new Action1<Throwable>() {

        public void call(Throwable error) {
            System.out.println("Error encountered: " + error.getMessage());
        }
    },
    new Action0() {

        public void call() {
            System.out.println("Sequence complete");
        }
    }

      );

}

}

花哨的对象:

import java.util.Date;
import java.util.UUID;

/**
* @author stw
*
*/
public class FancyObject  {
private String name = UUID.randomUUID().toString();
private Date created = new Date();
public String getName() {
  return this.name;
}
public void setName(String name) {
  this.name = name;
}
public Date getCreated() {
  return this.created;
}
public void setCreated(Date created) {
  this.created = created;
}
}

迭代器:

import java.util.Iterator;

/**
 * @author stw
 *
 */
public class FancyIterator implements Iterator<FancyObject> {

  private final ComplicatedObject theObject;
  private int fancyCount = 0;


  public FancyIterator(ComplicatedObject co) {
    this.theObject = co;
  }

  public boolean hasNext() {
    return this.theObject.hasObject(this.fancyCount);
   }


   public FancyObject next() {
     FancyObject result = this.theObject.getOne(this.fancyCount);
     this.fancyCount++;  
     return result;
   }

}

可迭代对象:

import java.util.Iterator;
import java.util.Vector;

/**
 * @author stw
 *
 */
public class ComplicatedObject implements Iterable<FancyObject> {

  private boolean isInitialized = false;

  Vector<FancyObject> allOfThem = new Vector<FancyObject>();


  public Iterator<FancyObject> iterator() {
   return new FancyIterator(this);
  }

  public boolean hasObject(int whichone) {
    if (!this.isInitialized) {
      this.setupAccesstoFancyObject(); 
    }
    return (whichone < this.allOfThem.size());
  }
  public FancyObject getOne(int whichone) {
      if (!this.isInitialized) {
        this.setupAccesstoFancyObject();
      }
      if (whichone < this.allOfThem.size()) {
        return this.allOfThem.get(whichone);
      }
      // If we ask bejond...
      this.isInitialized = false;
      this.teardownAccessToFancyObjects();
      return null;
  }

  private void setupAccesstoFancyObject() {
    System.out.println("Initializing fancy objects");
    for (int i = 0; i < 20; i++) {
      this.allOfThem.addElement(new FancyObject());
    }
    this.isInitialized = true;
  }

  private void teardownAccessToFancyObjects() {
    System.out.println("I'm doing proper cleanup here");

  }

}

真正的问题(谢谢@Andreas)似乎是:

当底层代码需要设置/拆卸时,我可以使用什么构造来创建 Observable,尤其是当人们期望并非所有元素都被拉出时。 Iterable 只是我的第一个想法

更新 2:基于 Dave 的回答 I created a gist用我的工作解决方案。迭代器并不完美,但它是一个开始。

最佳答案

Observable.using 用于终止(完成或错误)或取消订阅。要使用它,您需要使拆卸代码可访问,以便您的源可观察对象看起来像这样:

source = Observable.using(
    resourceFactory, 
    observableFactory, 
    resourceDisposer);

您的代码可能如下所示:

source = Observable.using(
    () -> new ComplicatedObject(),
    co -> Observable.from(co), 
    co -> co.tearDown());

关于java - 在未读取所有元素时清理 Iterable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39217768/

相关文章:

android - RxJava : Executing an AsyncTask from Subscribe() fails

swift - ReactiveCocoa - 具有一般错误处理功能的信号生成器序列

java - 在受限 (Citrix) 环境中从 Java 调用 .bat 文件

java - Jpa 存储库 findBy 方法签名具有多个 or 运算符?

java - 当条件被 Action 监听器改变时中断 while 循环

rx-java - 错误后重试同一项目

android - 抛出异常。添加 onError 处理

c++ - 如何在自定义 rxcpp 运算符上调用 on_error

javascript - RXJS 中的分页数据游标以及对 subject.onCompleted 和错误的混淆

java - 多线程编程以及OS、CPU的支持?