java - 在 Rx Observable 事务中使用 Realm 调用时,如何防止 Realm 线程问题?

标签 java multithreading realm rx-java observable

这是调用 Rx 事务的 ViewModel

RealmHelperRepo是HelperRepo接口(interface)的实现

@PerActivity
public class RoleSelectionViewModel extends BaseViewModel<RoleSelectionMvvm.View> implements RoleSelectionMvvm.ViewModel {

     private Disposable roleGroupSubscription;

     @Inject
        public RoleSelectionViewModel(@AppContext Context context, HelperRepo helperRepo, ApiOAuth2 ApiOAuth2) {

        this.mContext = context;
        this.mUserRepo = userRepo;
        this.mHelperRepo = helperRepo;
        ApiOAuth2.initialize();
        this.mApiOAuth2 = ApiOAuth2;

        this.mCurrentUser = mUserRepo.getByField("isLoggedIn", true, true);
        if (mCurrentUser != null) {
            this.mCurrentUserId = mCurrentUser.getId();
            this.mHelper = mHelperRepo.getByField("user.id", mCurrentUserId, true);
    }

  Observable<Response<ResponseHelper>> postHelperObservable = mApiOAuth2.postHelperRX(new Helper());
  Observable<Response<ResponseHelper>> getHelperObservable = mApiOAuth2.getHelperRX(mCurrentUserId);

 roleGroupSubscription = postRoleGroupsObservable
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .flatMap((response) -> {
                        if (response.isSuccessful()) {
                            ResponseHelper responseHelper = response.body();
                            mHelper = responseHelper.getHelper();
                            return Observable.just(mHelper);
                        } else if (response.code() == 409) {
                       // handle POST conflict (i.e. helper already exists)
                            return getHelperObservable;
                        }

                   })
                   .subscribe((data) -> {
                        if (data instanceof Response<?>) {
                            // data came from getHelperObservable
                            Response response = (Response) data;
                            if (!response.isSuccessful()) {
                               ResponseHelper responseHelper = (ResponseHelper) response.body();
                            mHelper = responseHelper.getHelper();
                       else {

                            // data came from Observable.just(helper)
                            mApiOAuth2.getHelperRX(mCurrentUserId).subscribe(
                                    responseHelperResponse -> {

                                        if (responseHelperResponse.isSuccessful()) {

                                            String helperID = responseHelperResponse.body().getHelper().getId();
                                            Log.d("RealmCount", "save: " + Realm.getLocalInstanceCount(realmProvider.get().getConfiguration()));
                                            mHelper.setId(helperID);
                                            mHelper.setUser(mCurrentUser);
--------> // when mHelperRepo.save(mHelper) is called, it goes to RealmHelperRepo to save and 
--------> // thus triggering mRealm.executeTransaction causing Realm threading
                                            mHelperRepo.save(mHelper);
                                        }
                                        saveAndBegin();
                                    },
                                    Throwable::printStackTrace);
                                    });

这是进行 Realm 调用的 RealmRepo 类。

@PerApplication
public class RealmHelperRepo implements HelperRepo {

   private final Provider<Realm> mRealmProvider;
   private Realm mRealm;

   @Inject
    public RealmHelperRepo(Provider<Realm> realmProvider) {
        this.mRealmProvider = realmProvider;
        this.mRealm = mRealmProvider.get();
}


  @Override
    public void save(Helper helper) {
        if (mRealm != null) {
---------> // code runs into threading issue here when a realmThread executeTransaction is called
        mRealm.executeTransaction(r -> r.copyToRealmOrUpdate(helper)); 
        }
    }

我在这里缺少什么吗?我应该使用其他 Rx 函数来代替平面图?是否有其他方法可以保存我的可观察数据而不会遇到线程问题?帮忙!

最佳答案

Is there something I'm missing here?

Realm 实例代表引用计数线程本地实例。它不是一个全局的东西,它是一个“本地实例”,由 getInstance() 打开,然后由 close() 关闭。

因此,您不能仅将 Realm 实例初始化为单例,因为无法从后台线程访问它。


例如,您可以提供一个能够打开线程本地 Realm 实例的单例 Realm 管理器类。

/**
 * The RealmManager allows creating a singleton Realm manager which can open thread-local instances.
 *
 * It also allows obtaining the open thread-local instance without incrementing the reference count.
 */
@PerApplication
public class RealmManager {
    private final ThreadLocal<Realm> localRealms = new ThreadLocal<>();

    @Inject
    RealmManager() {
    }

    /**
     * Opens a reference-counted local Realm instance.
     *
     * @return the open Realm instance
     */
    public Realm openLocalInstance() {
        checkDefaultConfiguration();
        Realm realm = Realm.getDefaultInstance(); // <-- maybe configuration should be constructor parameter
        if(localRealms.get() == null) {
            localRealms.set(realm);
        }
        return realm;
    }

    /**
     * Returns the local Realm instance without adding to the reference count.
     *
     * @return the local Realm instance
     * @throws IllegalStateException when no Realm is open
     */
    public Realm getLocalInstance() {
        Realm realm = localRealms.get();
        if(realm == null) {
            throw new IllegalStateException(
                    "No open Realms were found on this thread.");
        }
        return realm;
    }

    /**
     * Closes local Realm instance, decrementing the reference count.
     *
     * @throws IllegalStateException if there is no open Realm.
     */
    public void closeLocalInstance() {
        checkDefaultConfiguration();
        Realm realm = localRealms.get();
        if(realm == null) {
            throw new IllegalStateException(
                    "Cannot close a Realm that is not open.");
        }
        realm.close();
        // noinspection ConstantConditions
        if(Realm.getLocalInstanceCount(Realm.getDefaultConfiguration()) <= 0) {
            localRealms.set(null);
        }
    }

    private void checkDefaultConfiguration() {
        if(Realm.getDefaultConfiguration() == null) {
            throw new IllegalStateException("No default configuration is set.");
        }
    }
}

你可以像这样使用它

@PerApplication
public class RealmHelperRepo implements HelperRepo {
   private final RealmManager realmManager;

    @Inject
    public RealmHelperRepo(RealmManager realmManager) {
        this.realmManager = realmManager;
    }


    @Override
    public void save(Helper helper) {
        try(Realm realm = realmManager.openLocalInstance()) {
            realm.executeTransaction(r -> r.copyToRealmOrUpdate(helper)); 
        }
    }

从技术上讲,它只是隐藏了 Realm.getDefaultInstance() 调用,并允许您获取线程本地实例,即使不增加内部 RealmCache 引用计数,因此没有太多真正的魔力。

只需为线程打开一个 Realm 实例,当不再需要时不要忘记将其关闭。

关于java - 在 Rx Observable 事务中使用 Realm 调用时,如何防止 Realm 线程问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46588960/

相关文章:

java - 对话框启动时 onKeydown 不允许返回

Java查找循环结构的方法

c++ - 如何按照它们最初在 C++ 中生成的顺序从有界缓冲区中检索项目?

c# - Java原子整数和C#Interlocked.Increment方法的区别

java - 在java中用两个线程打印HashMap的键?

java - Android Realm 不正确的线程

java - 如何在命令提示符下运行 java 程序,由 intellij 创建

java - 无法实例化应用程序 : java. lang.ClassNotFoundException:

swift - 尝试在 Realm 对象上使用 setValue forKey 的问题

ios - 尝试在写入事务之外修改对象