7

मैं एक एंड्रॉइड एप्लिकेशन बना रहा हूं जिसमें ब्लूटूथ लो एनर्जी के संबंध में विशिष्ट आवश्यकताएं हैं।RxAndroidBle एक सतत कनेक्शन रखना + लिखें/अधिसूचना हैंडलिंग

मुझे केवल एक लिखने वाली विशेषता को लिखने और एक अलग अधिसूचना विशेषता पर प्रतिक्रिया प्राप्त करने की आवश्यकता है, और मुझे इसे कई गतिविधियों में करने की आवश्यकता है। क्या पहली विशेषता पर अनुरोध भेजने के लिए कोई आरएक्स तरीका है, दूसरे के उत्तर की प्रतीक्षा करें, फिर दूसरे अनुरोध पर जाएं?

इसके अलावा, RxAndroidBle के मेरे उदाहरण को साझा करने के लिए मैंने कुछ प्रकार के ब्लेमैनगर सिंगलटन के बारे में सोचा, जहां मैं पर्यवेक्षकों का पर्दाफाश करता हूं, इसलिए मैं आसानी से उनके प्रेजेंटर में उनकी सदस्यता ले सकता हूं। मैं बस प्रत्येक गतिविधि के लिए कनेक्शन तर्क की प्रतिलिपि बनाने से बचना चाहता हूं और (आदर्श) एक सतत कनेक्शन है। इस तरह से मैं केवल कनेक्शन का खुलासा कर सकता हूं और इसके लिए सदस्यता ले सकता हूं, इसलिए मैं आसानी से लिखने के अनुरोध भेज सकता हूं और अधिसूचनाएं प्राप्त कर सकता हूं, लेकिन मुझे यकीन है कि ऐसा करने का एक बेहतर तरीका है।

यह वही है मैं अब के लिए है:

@Singleton 
public class BleManager { 

    private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create(); 
    private Observable<RxBleConnection> connectionObservable; 
    private boolean isConnected; 

    private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID"); 
    private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID"); 

    private final RxBleClient bleClient; 
    private String mMacAddress; 
    private final Context context; 
    private RxBleDevice bleDevice; 

    @Inject 
    public BleManager(Context context, RxBleClient client) { 
    Timber.d("Constructing BleManager and injecting members"); 
    this.context = context; 
    this.bleClient = client; 
    } 

    public void setMacAddress(String mMacAddress) { 
    this.mMacAddress = mMacAddress; 

    // Set the associated device on MacAddress change 
    bleDevice = bleClient.getBleDevice(this.mMacAddress); 
    } 

    public String getMacAddress() { 
    return mMacAddress; 
    } 

    public RxBleDevice getBleDevice() { 
    Preconditions.checkNotNull(mMacAddress); 
    return bleClient.getBleDevice(mMacAddress); 
    } 

    public Observable<RxBleScanResult> getScanSubscription() { 
    Preconditions.checkNotNull(context); 
    Preconditions.checkNotNull(bleClient); 

    return bleClient.scanBleDevices().distinct(); 
    } 

    public Observable<RxBleConnection> getConnectionSubscription() { 
    Preconditions.checkNotNull(context); 
    Preconditions.checkNotNull(bleDevice); 

    if (connectionObservable == null) { 
     connectionObservable = bleDevice.establishConnection(context, false) 
             .takeUntil(disconnectTriggerSubject) 
             .observeOn(AndroidSchedulers.mainThread()) 
             .doOnUnsubscribe(this::clearSubscription) 
             .compose(new ConnectionSharingAdapter()); 
    } 

    return connectionObservable; 
    } 

    public Observable<byte[]> setupListeners() { 
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID)) 
           .doOnNext(notificationObservable -> Timber.d("Notification Setup")) 
           .flatMap(notificationObservable -> notificationObservable) 
           .observeOn(AndroidSchedulers.mainThread()); 
    } 

    private void triggerDisconnect() { 
    disconnectTriggerSubject.onNext(null); 
    } 


    public Observable<byte[]> writeBytes(byte[] bytes) { 
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(
     BLE_WRITE_CHARACTERISTIC_UUID, 
     bytes)).observeOn(AndroidSchedulers.mainThread()); 
    } 

    private boolean isConnected() { 
    return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED; 
    } 

    /** 
    * Will update the UI with the current state of the Ble Connection 
    */ 
    private void registerConnectionStateChange() { 
    bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> { 
     isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED); 
    }); 
    } 

    private void clearSubscription() { 
    connectionObservable = null; 
    } 

} 

उत्तर

4

मैं अपने यूज-केस के बारे में थोड़ा सोचा है। उसी कनेक्शन को साझा करने के साथ आप अपने आवेदन के लिए राज्यों को पेश कर रहे हैं, जिनके लिए कुछ राज्य हैंडलिंग की आवश्यकता है और इसलिए यह असंभव है (या कम से कम मुझे नहीं पता कि कैसे) पूरी तरह से प्रतिक्रियाशील होना चाहिए।

मैंने एक कनेक्शन स्थापित करने और धारावाहिक रूप से एक बीएलई डिवाइस को लिखने के लिए एक सूचना-वितरण ट्रांसमिशन करने पर ध्यान केंद्रित किया है।

private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create(); 

private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create(); 

private Subscription connectionSubscription; 

private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time 

public void connect() { 
    Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection() 
    final UUID notificationUuid = // your notification characteristic UUID 
    final UUID writeUuid = // your write-only characteristic UUID 

    connectionSubscription = connectionObservable 
      .flatMap(
        rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications 
        (rxBleConnection, notificationObservable) -> // connection is established and notification prepared 
          inputSubject // waiting for the data-packet to transmit 
            .onBackpressureBuffer() 
            .flatMap(bytesAndFilter -> { 
               return Observable.combineLatest(// subscribe at the same time to 
                 notificationObservable.take(1), // getting the next notification bytes 
                 rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device 
                 (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes 
               ) 
                 .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier 
              }, 
              1 // serializing communication as only one Observable will be processed at the same time 
            ) 
      ) 
      .flatMap(observable -> observable) 
      .subscribe(
        response -> { /* ignored here - used only as side effect with outputSubject */ }, 
        throwable -> outputSubject.onError(throwable) 
      ); 
} 

public void disconnect() { 
    if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) { 
     connectionSubscription.unsubscribe(); 
     connectionSubscription = null; 
    } 
} 

public Observable<byte[]> writeData(byte[] data) { 
    return Observable.defer(() -> { 
       final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response 
       inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect() 
       return outputSubject 
         .filter(responseIdPair -> responseIdPair.second == uniqueId) 
         .first() 
         .map(responseIdPair -> responseIdPair.first); 
      } 
    ); 
} 

यह एक ऐसा दृष्टिकोण है जो मुझे लगता है कि पूरे प्रवाह को एक स्थान पर वर्णित किया गया है और इसलिए समझना आसान है। संचार का हिस्सा जो राज्यपूर्ण है (लेखन अनुरोध और प्रतिक्रिया के लिए प्रतीक्षा) धारावाहिक है और इसमें disconnect() कॉल तक कनेक्शन जारी रखने की संभावना है।

नकारात्मकता यह है कि संचरण विभिन्न प्रवाह के साइड इफेक्ट्स पर निर्भर करता है और कनेक्शन स्थापित होने से पहले writeData() पर कॉल करता है और अधिसूचना सेटअप कभी भी लौटाई नहीं जा सकती है हालांकि यह परिदृश्य के लिए हैंडलिंग जोड़ने में कोई समस्या नहीं होनी चाहिए एक राज्य की जांच के साथ।

सर्वश्रेष्ठ सादर

+0

बहुत बढ़िया, मैं आपके समाधान का प्रयास करूंगा और आपको बताऊंगा कि यह कैसा चल रहा है। यह वास्तव में सराहना की है क्योंकि मैं आरएक्स दुनिया में नया हूं और अभी भी सीख रहा हूं! –

+3

क्या आपने प्रस्तावित समाधान की कोशिश की है? क्या यह इरादा के रूप में काम किया? –

संबंधित मुद्दे