2017-09-30 9 views
5

में प्रवाह प्रवाह फिर से मैं जावा 9 प्रवाह के साथ आरएक्सजेवा की तुलना कर रहा हूं। मैं देखता हूं कि डिफ़ॉल्ट रूप से प्रवाह असीमित रूप से है, और मैं सोच रहा था कि इसे सिंक्रनाइज़ करने के लिए कोई तरीका है या नहीं।मुख्य धागे

कभी-कभी हम इसे नियो के लिए नहीं बल्कि चीनी वाक्यविन्यास के लिए उपयोग करना चाहते हैं, और एक और समरूप कोड है।

डिफ़ॉल्ट रूप से आरएक्सजेवा में यह सिंक्रनाइज़ रूप से है, और आप इसे अपनी पाइपलाइन में observerOn और subscribeOn का उपयोग करके असीमित रूप से चला सकते हैं।

फ़्लो में कोई भी ऑपरेटर मुख्य धागे में चलाने के लिए है?

सम्मान।

उत्तर

5

आप एक सिंक्रोनस निष्पादन का उपयोग करने के लिए Flow में दस्तावेज़ के रूप में अपने कस्टम Publisher को परिभाषित कर सकते हैं।

एक बहुत ही सरल प्रकाशक जो केवल एक ही ग्राहक को एक TRUE आइटम को जारी करता है (जब अनुरोध किया जाता है)। चूंकि ग्राहक को केवल एक ही आइटम प्राप्त होता है, इसलिए यह वर्ग बफरिंग और ऑर्डरिंग नियंत्रण का उपयोग नहीं करता है।

class OneShotPublisher implements Publisher<Boolean> { 
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based 
    private boolean subscribed; // true after first subscribe 
    public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { 
    if (subscribed) 
     subscriber.onError(new IllegalStateException()); // only one allowed 
    else { 
     subscribed = true; 
     subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); 
    } 
    } 
    static class OneShotSubscription implements Subscription { 
    private final Subscriber<? super Boolean> subscriber; 
    private final ExecutorService executor; 
    private Future<?> future; // to allow cancellation 
    private boolean completed; 
    OneShotSubscription(Subscriber<? super Boolean> subscriber, 
         ExecutorService executor) { 
     this.subscriber = subscriber; 
     this.executor = executor; 
    } 
    public synchronized void request(long n) { 
     if (n != 0 && !completed) { 
     completed = true; 
     if (n < 0) { 
      IllegalArgumentException ex = new IllegalArgumentException(); 
      executor.execute(() -> subscriber.onError(ex)); 
     } else { 
      future = executor.submit(() -> { 
      subscriber.onNext(Boolean.TRUE); 
      subscriber.onComplete(); 
      }); 
     } 
     } 
    } 
    public synchronized void cancel() { 
     completed = true; 
     if (future != null) future.cancel(false); 
    } 
    } 
} 
+2

विस्तृत उदाहरण कोड के लिए धन्यवाद। – paul

3

ऐसा करने का कोई ऑपरेटर नहीं है, लेकिन एपीआई आप जिस तरह से आइटम प्रकाशित कर रहे हैं नियंत्रित करने के लिए अनुमति देता है। इसलिए आप सीधे मौजूदा थ्रेड से ग्राहक विधियों को कॉल कर सकते हैं।

class SynchronousPublisher implements Publisher<Data> { 
     public synchronized void subscribe(Subscriber<? super Data> subscriber) { 
      subscriber.onSubscribe(new SynchronousSubscription(subscriber)); 
     } 
} 
static class SynchronousSubscription implements Subscription { 
     private final Subscriber<? super Data> subscriber; 

     SynchronousSubscription(Subscriber<? super Data> subscriber) { 
      this.subscriber = subscriber; 
     } 
     public synchronized void request(long n) { 
      ... // prepare item    
      subscriber.onNext(someItem);  
     } 

     ... 
    } 
} 
+0

धन्यवाद, यह भी एक अच्छा कामकाज है, अभी भी काफी verbose, लेकिन हे! यह जावा है :) – paul

+1

@paul ध्यान रखें कि जावा इसे आरएस विनिर्देश के रूप में प्रदान करता है (कोई कार्यान्वयन नहीं)। यह आरएक्सजेवा का विकल्प नहीं है। एक विशिष्ट के रूप में कार्य करने के लिए केवल न्यूनतम आवश्यक है। – manouti

2

यह मुख्य धागे पर चलकर आपके मतलब पर निर्भर करता है।

यदि आप एक विशिष्ट थ्रेड पर निष्पादित करने के लिए मनमाने ढंग से प्रवाह को मजबूर करना चाहते हैं, तो ऐसा करने के लिए कोई मानक तरीका नहीं है जब तक कि लाइब्रेरी में फ्लो लागू नहीं किया जाता है, जिससे आप एसिंक्रोनि-प्रदान करने वाले हिस्सों को ओवरराइड करते हैं। आरएक्सजेवा शब्दों में, Scheduler एस Schedulers उपयोगिता वर्ग द्वारा प्रदान की गई हैं।

यदि आप मुख्य धागे पर प्रवाह का निरीक्षण करना चाहते हैं, तो आपको Flow.Subscriber के शीर्ष पर अवरुद्ध कतार उपभोक्ता लिखना होगा जो कतार को तब तक ब्लॉक करता है जब तक कतार में आइटम नहीं होते। यह जटिल हो सकता है इसलिए मैं में blockingSubscribe कार्यान्वयन के लिए आपको संदर्भित करूंगा।

यदि आप जावा मुख्य थ्रेड को Executor/Scheduler के रूप में उपयोग करना चाहते हैं, तो यह और भी जटिल है और समान ब्लॉकिंग तंत्र के साथ-साथ थ्रेडपूल निष्पादक के कुछ विचारों की आवश्यकता है। Reactive4JavaFlow ऐसा शेड्यूलर होता है, जिसे आप एक्जिक्यूटर के रूप में उपयोग कर सकते हैं: new SubmissionPublisher<>(128, blockingScheduler::schedule)

संबंधित मुद्दे