我正在构建一个 Android 应用程序,它对低功耗蓝牙有特定要求。
我需要写入一个只写特征并在一个单独的通知特征上接收响应,我需要在很多很多 Activity 中这样做。有没有一种 Rx 方法可以在第一个特征上发送请求,等待第二个特征的回答,然后继续另一个请求?
此外,为了分享我的 RxAndroidBle 实例,我考虑做某种 BleManager Singleton,在其中公开 Observables,这样我就可以轻松地在我的 Presenter 中订阅它们。我只是想避免必须为每个 Activity 复制连接逻辑并拥有(理想情况下)持久连接。这样我只能公开 connectionObservable 并订阅它,所以我可以轻松地发送写入请求和获取通知,但我确信有更好的方法来做到这一点。
这是我目前拥有的:
@Singleton
public class BleManager {
private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create();
private Observable<RxBleConnection> connectionObservable;
private boolean isConnected;
private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID");
private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID");
private final RxBleClient bleClient;
private String mMacAddress;
private final Context context;
private RxBleDevice bleDevice;
@Inject
public BleManager(Context context, RxBleClient client) {
Timber.d("Constructing BleManager and injecting members");
this.context = context;
this.bleClient = client;
}
public void setMacAddress(String mMacAddress) {
this.mMacAddress = mMacAddress;
// Set the associated device on MacAddress change
bleDevice = bleClient.getBleDevice(this.mMacAddress);
}
public String getMacAddress() {
return mMacAddress;
}
public RxBleDevice getBleDevice() {
Preconditions.checkNotNull(mMacAddress);
return bleClient.getBleDevice(mMacAddress);
}
public Observable<RxBleScanResult> getScanSubscription() {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(bleClient);
return bleClient.scanBleDevices().distinct();
}
public Observable<RxBleConnection> getConnectionSubscription() {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(bleDevice);
if (connectionObservable == null) {
connectionObservable = bleDevice.establishConnection(context, false)
.takeUntil(disconnectTriggerSubject)
.observeOn(AndroidSchedulers.mainThread())
.doOnUnsubscribe(this::clearSubscription)
.compose(new ConnectionSharingAdapter());
}
return connectionObservable;
}
public Observable<byte[]> setupListeners() {
return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID))
.doOnNext(notificationObservable -> Timber.d("Notification Setup"))
.flatMap(notificationObservable -> notificationObservable)
.observeOn(AndroidSchedulers.mainThread());
}
private void triggerDisconnect() {
disconnectTriggerSubject.onNext(null);
}
public Observable<byte[]> writeBytes(byte[] bytes) {
return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(
BLE_WRITE_CHARACTERISTIC_UUID,
bytes)).observeOn(AndroidSchedulers.mainThread());
}
private boolean isConnected() {
return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED;
}
/**
* Will update the UI with the current state of the Ble Connection
*/
private void registerConnectionStateChange() {
bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> {
isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED);
});
}
private void clearSubscription() {
connectionObservable = null;
}
}
最佳答案
我已经对您的用例进行了一些思考。通过共享相同的连接,您正在向您的应用程序引入状态,这需要一些状态处理,因此不可能(或者至少我不知道如何)完全 react 。
我专注于建立连接并向序列化的 BLE 设备执行写入通知传输。
private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create();
private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create();
private Subscription connectionSubscription;
private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time
public void connect() {
Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection()
final UUID notificationUuid = // your notification characteristic UUID
final UUID writeUuid = // your write-only characteristic UUID
connectionSubscription = connectionObservable
.flatMap(
rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications
(rxBleConnection, notificationObservable) -> // connection is established and notification prepared
inputSubject // waiting for the data-packet to transmit
.onBackpressureBuffer()
.flatMap(bytesAndFilter -> {
return Observable.combineLatest( // subscribe at the same time to
notificationObservable.take(1), // getting the next notification bytes
rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device
(responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes
)
.doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier
},
1 // serializing communication as only one Observable will be processed at the same time
)
)
.flatMap(observable -> observable)
.subscribe(
response -> { /* ignored here - used only as side effect with outputSubject */ },
throwable -> outputSubject.onError(throwable)
);
}
public void disconnect() {
if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) {
connectionSubscription.unsubscribe();
connectionSubscription = null;
}
}
public Observable<byte[]> writeData(byte[] data) {
return Observable.defer(() -> {
final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response
inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect()
return outputSubject
.filter(responseIdPair -> responseIdPair.second == uniqueId)
.first()
.map(responseIdPair -> responseIdPair.first);
}
);
}
我认为这是一种很好的方法,因为整个流程都在一个地方描述,因此更容易理解。有状态的通信部分(写入请求和等待响应)是序列化的,它有可能保持连接直到 disconnect()
调用。
缺点是传输依赖于不同流的副作用,并且在建立连接之前调用 writeData()
并且通知设置永远不会完成返回的 Observable,尽管它不应该是通过状态检查为这种情况添加处理的问题。
最好的问候
关于android - RxAndroidBle 保持持久连接 + 写/通知处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38902913/