rust - 如何通过多个 Java 线程使用只读借用的 Rust 数据?

标签 rust java-native-interface borrowing

我有一个结构 FooFooRef,它们引用了来自 Foo 的数据:

struct Foo { /* ... */ }

struct FooRef<'foo> { /* ... */ }

impl Foo {
    pub fn create_ref<'a>(&'a self) -> FooRef<'a> { /* ... */ }
}

现在 Foo 直接不能在逻辑中使用;我需要 FooRef。创建 FooRef 需要大量计算,所以我在创建 Foo 实例后就做了一次。 FooRef 是不可变的;它仅用于读取数据。

多个线程需要访问这个FooRef 实例。我该如何实现?调用线程是 Java 线程,这将与 JNI 一起使用。例如,这可以防止使用范围限定的线程池。

另一个复杂的问题是,当我必须刷新 Foo 实例以将新数据加载到其中时。然后我还需要重新创建 FooRef 实例。

如何实现线程安全和内存安全?我试着弄乱指针和 RwLock 但这导致了内存泄漏(内存使用量在每次重新加载时不断增加)。我是一名 Java 开发人员,是指针的新手。

Foo 中的数据主要是文本,大约 250Mb。 FooRef 主要是 str 和从 Foo 借来的 str 的结构。

我的Java使用说明

我在 Java 类中使用两个 long 变量来存储指向 FooFooRef 的指针。我使用静态 ReentrantReadWriteLock 来保护这些指针。

如果Foo中的数据需要更新,我获取写锁,删除FooRef,更新Foo,创建一个新的FooRef 并更新 Java 中的 ref 指针。

如果我需要读取数据(即当我不更新 Foo 时),我会获取读取锁并使用 FooRef

内存泄漏仅在多个 Java 线程调用此代码时可见。

使用rust :

use jni::objects::{JClass, JString};
use jni::sys::{jlong, jstring};
use jni::JNIEnv;

use std::collections::HashMap;

macro_rules! foo_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptr", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut Foo)
                }
            })
    };
}

macro_rules! foo_ref_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptrRef", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut FooRef)
                }
            })
    };
}

macro_rules! foo_mut {
    ($env: expr, $class: expr) => {
        foo_mut_ptr!($env, $class).map(|ptr| &mut *ptr)
    };
}

macro_rules! foo_ref {
    ($env: expr, $class: expr) => {
        foo_ref_mut_ptr!($env, $class).map(|ptr| &*ptr)
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_create(_env: JNIEnv, _class: JClass) -> jlong {
    Box::into_raw(Box::new(Foo::default())) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_createRef(env: JNIEnv, class: JClass) -> jlong {
    let foo = foo_mut!(env, class).expect("createRef was called on uninitialized Data");
    let foo_ref = foo.create_ref();
    Box::into_raw(Box::new(foo_ref)) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_reload(env: JNIEnv, class: JClass) {
    let foo = foo_mut!(env, class).expect("foo must be initialized");
    *foo = Foo {
        data: vec!["hello".to_owned(); 1024 * 1024],
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroy(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
    drop_ptr(foo_mut_ptr!(env, class));
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroyRef(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
}

unsafe fn drop_ptr<T>(ptr: Option<*mut T>) {
    if let Some(ptr) = ptr {
        let _foo = Box::from_raw(ptr);
        // foo drops here
    }
}

#[derive(Default)]
struct Foo {
    data: Vec<String>,
}

#[derive(Default)]
struct FooRef<'a> {
    data: HashMap<&'a str, Vec<&'a str>>,
}

impl Foo {
    fn create_ref(&self) -> FooRef {
        let mut data = HashMap::new();
        for s in &self.data {
            let s = &s[..];
            data.insert(s, vec![s]);
        }
        FooRef { data }
    }
}

Java:

package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

public class App implements AutoCloseable {
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReadLock readLock = lock.readLock();
    private final WriteLock writeLock = lock.writeLock();

    private volatile long ptr;
    private volatile long ptrRef;
    private volatile boolean reload;

    static {
        System.loadLibrary("foo");
    }

    public static void main(String[] args) throws InterruptedException {
        try (App app = new App()) {
            for (int i = 0; i < 20; i++) {
                new Thread(() -> {
                    while (true) {
                        app.tryReload();
                    }
                }).start();
            }

            while (true) {
                app.setReload();
            }
        }
    }

    public App() {
        this.ptr = this.create();
    }

    public void setReload() {
        writeLock.lock();
        try {
            reload = true;
        } finally {
            writeLock.unlock();
        }
    }

    public void tryReload() {
        readLock.lock();
        debug("Got read lock");

        if (reload) {
            debug("Cache is expired");

            readLock.unlock();
            debug("Released read lock coz expired");

            writeLock.lock();
            debug("Got write lock");

            try {
                if (reload) {
                    fullReload();
                }

                readLock.lock();
                debug("Got read lock inside write");
            } finally {
                writeLock.unlock();
                debug("Released write lock");
            }
        }

        readLock.unlock();
        debug("Released read lock");
    }

    private void fullReload() {
        destroyRef();
        debug("Dropped ref");

        debug("Reloading");
        reload();
        debug("Reloading completed");

        updateRef();
        debug("Created ref");
        reload = false;
    }

    private void updateRef() {
        this.ptrRef = this.createRef();
    }

    private native void reload();

    private native long create();

    private native long createRef();

    private native void destroy();

    private native void destroyRef();

    @Override
    public void close() {
        writeLock.lock();
        try {
            this.destroy();
            this.ptrRef = 0;
            this.ptr = 0;
        } finally {
            writeLock.unlock();
        }
    }

    private static void debug(String s) {
        System.out.printf("%10s : %s%n", Thread.currentThread().getName(), s);
    }

}

最佳答案

我认为是内存泄漏的问题实际上并不是内存泄漏。问题是分配器使用的是线程局部区域。因此,无论什么线程正在重新加载 250MB 的数据,都将分配的空间保持原样,而不是将其返回给系统。这个问题不是 JNI 特有的,也发生在纯安全的 Rust 代码中。参见 Why multiple threads using too much memory when holding Mutex

在我的例子中,创建的默认竞技场数量默认为 8 * cpu count = 64。可以通过设置 MALLOC_ARENA_MAX 环境变量来覆盖此设置。

所以我通过将 MALLOC_ARENA_MAX env 变量设置为 1 解决了这个问题。所以,我采取的方法很好。这只是平台特定的问题。

此问题仅在 WSL 中的 Ubuntu 中发生。我还在 Windows 10 上尝试了相同的代码,但没有进行任何调整,它可以完美运行,没有任何问题。

关于rust - 如何通过多个 Java 线程使用只读借用的 Rust 数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58101664/

相关文章:

java - Eclipse JNI共享库“架构字宽不匹配”错误的ELF类UnsatisfiedLinkError

rust - 借用检查器不允许从树遍历函数返回可变引用

rust - 在删除 Rust Future 时 panic 运行异步代码

unit-testing - 为编译器阶段编写可维护的测试

rust - 如何使用 openssl 交叉编译一个 rust 项目?

java - 包装 C++ 对象的最佳 JNI 模式?

rust - Rust 中的只读取消引用

android - 为什么没有从我的 APK 中复制 libgnuSTL_shared.so?

rust - 进行多次可变和不可变借贷的最佳实践是什么? [复制]

rust - 当输入非常清楚时,为什么借用检查器需要输出生命周期标签?