2016-02-16 13 views
7

मैं RxJava नमूदार एपीआई का उपयोग कर निम्न कोड हो रहा है पर्यवेक्षक कोड के समानांतर निष्पादन:RxJava और

के बाद
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); 
    observable 
     .buffer(10000) 
     .observeOn(Schedulers.computation()) 
     .subscribe(recordInfo -> { 
     _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); 
      for(Info info : recordInfo) { 
      // some I/O operation logic 
     } 
     }, 
     exception -> { 
     }, 
    () -> { 
     }); 

मेरे उम्मीद है कि गया था विधि सदस्यता के() के अंदर अवलोकन कोड यानी कोड समानांतर में क्रियान्वित की जाएगी मैं गणना अनुसूची निर्दिष्ट किया है। इसके बजाए कोड को अभी भी सिंगल थ्रेड पर क्रमशः निष्पादित किया जा रहा है। कोड RxJava एपीआई का उपयोग कर समानांतर में कोड कैसे चला सकता है।

उत्तर

26

आरएक्सजेवा अक्सर इसके बारे में गलत समझा जाता है जब यह असीमित/बहुप्रचारित पहलुओं की बात आती है। बहुप्रचारित संचालन का कोडिंग सरल है, लेकिन अमूर्तता को समझना एक और बात है।

आरएक्सजेवा के बारे में एक आम सवाल समानांतरता को प्राप्त करने, या एक अवलोकन से समसामयिक रूप से कई वस्तुओं को उत्सर्जित करना है। बेशक, यह परिभाषा अवलोकन अनुबंध को तोड़ती है जो बताती है कि ऑनएक्स्ट() को अनुक्रमिक रूप से कहा जाना चाहिए और एक समय में एक से अधिक धागे द्वारा समवर्ती रूप से कभी नहीं कहा जाना चाहिए।

समांतरता प्राप्त करने के लिए आपको एकाधिक पर्यवेक्षण की आवश्यकता है।

यह एकल थ्रेड में चलाता है:

Observable<Integer> vals = Observable.range(1,10); 

vals.subscribeOn(Schedulers.computation()) 
      .map(i -> intenseCalculation(i)) 
      .subscribe(val -> System.out.println("Subscriber received " 
        + val + " on " 
        + Thread.currentThread().getName())); 

यह एक से अधिक थ्रेड में चलता है:

Observable<Integer> vals = Observable.range(1,10); 

vals.flatMap(val -> Observable.just(val) 
      .subscribeOn(Schedulers.computation()) 
      .map(i -> intenseCalculation(i)) 
).subscribe(val -> System.out.println(val)); 

कोड और पाठ comes from this blog post.

+1

rxjava में बहु-थ्रेडिंग कोडिंग के लिए बढ़िया। –

2

आप उस उद्देश्य के लिए subscribeOn(Schedulers.computation()) बजाय observeOn(Schedulers.computation()) निर्दिष्ट करना । subscribeOn में आप घोषित करते हैं कि आप किस धागे को अपने मूल्यों को छोड़ने जा रहे हैं। observeOn में आप घोषित करते हैं कि आप किस धागे को संभालने जा रहे हैं और उनका निरीक्षण करेंगे।

0

यह अभी भी एक ही अनुक्रम पर आता है। यहां तक ​​कि नए सूत्र

पर

नमूदार ob3 = Observable.range (1, 5);

ob3.flatMap(new Func1<Integer, Observable<Integer>>() { 

     @Override 
     public Observable<Integer> call(Integer pArg0) { 

      return Observable.just(pArg0); 
     } 

    }).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() { 

     @Override 
     public Integer call(Integer pArg0) { 

      try { 
       Thread.sleep(1000 - (pArg0 * 100)); 
       System.out.println(pArg0 + " ccc " + Thread.currentThread().getName()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

      return pArg0; 
     } 

    }).subscribe(); 

आउटपुट 1 सीसीसी RxNewThreadScheduler -1 2 सीसीसी RxNewThreadScheduler -1 3 सीसीसी RxNewThreadScheduler -1 4 सीसीसी RxNewThreadScheduler -1 5 सीसीसी RxNewThreadScheduler-1

0

flatMap का उपयोग करना और निर्दिष्ट Schedulers.computation() पर सदस्यता के लिए समेकन प्राप्त करेंगे।

यहाँ, Callable का उपयोग कर उत्पादन से एक और अधिक व्यावहारिक उदाहरण है, हम इसे लगभग 2000 मिलीसेकेंड ले सभी कार्य समाप्त करने के लिए होगा देख सकते हैं।

static class MyCallable implements Callable<Integer> { 

    private static final Object CALLABLE_COUNT_LOCK = new Object(); 
    private static int callableCount; 

    @Override 
    public Integer call() throws Exception { 
     Thread.sleep(2000); 
     synchronized (CALLABLE_COUNT_LOCK) { 
      return callableCount++; 
     } 
    } 

    public static int getCallableCount() { 
     synchronized (CALLABLE_COUNT_LOCK) { 
      return callableCount; 
     } 
    } 
} 

private static void runMyCallableConcurrentlyWithRxJava() { 
    long startTimeMillis = System.currentTimeMillis(); 

    final Semaphore semaphore = new Semaphore(1); 
    try { 
     semaphore.acquire(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable()) 
      .flatMap(new Function<MyCallable, ObservableSource<?>>() { 
       @Override 
       public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception { 
        return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation()); 
       } 
      }) 
      .subscribeOn(Schedulers.computation()) 
      .subscribe(new Observer<Object>() { 
       @Override 
       public void onSubscribe(@NonNull Disposable d) { 

       } 

       @Override 
       public void onNext(@NonNull Object o) { 
        System.out.println("onNext " + o); 
       } 

       @Override 
       public void onError(@NonNull Throwable e) { 

       } 

       @Override 
       public void onComplete() { 
        if (MyCallable.getCallableCount() >= 4) { 
         semaphore.release(); 
        } 
       } 
      }); 


    try { 
     semaphore.acquire(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } finally { 
     semaphore.release(); 
    } 
    System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis)); 
}