2013-12-18 6 views
9

मैं प्रतिक्रियाशील प्रोग्रामिंग और आरएक्सजेवा की खोज कर रहा हूं। यह मजेदार है, लेकिन मैं एक समस्या पर फंस गया हूं जिसके लिए मुझे कोई जवाब नहीं मिल रहा है। मेरा मूल प्रश्न: अन्यथा अनगिनत चलने वाले पर्यवेक्षण को समाप्त करने के लिए एक प्रतिक्रियाशील-उपयुक्त तरीका क्या है? मैं आलोचकों का स्वागत करता हूं और मेरे कोड के संबंध में प्रतिक्रियाशील सर्वोत्तम प्रथाओं का भी स्वागत करता हूं।आरएक्सजेवा - अनंत स्ट्रीम को समाप्त करना

एक अभ्यास के रूप में, मैं एक लॉग फ़ाइल पूंछ उपयोगिता लिख ​​रहा हूं। लॉग फ़ाइल में लाइनों की धारा Observable<String> द्वारा दर्शायी जाती है। फ़ाइल में जोड़े गए पाठ को पढ़ने के लिए BufferedReader प्राप्त करने के लिए, मैं सामान्य reader.readLine() == null समाप्ति जांच को अनदेखा करता हूं और इसके बजाय इसका अर्थ यह समझने के लिए करता हूं कि मेरा धागा सोना चाहिए और अधिक लॉगर टेक्स्ट की प्रतीक्षा करनी चाहिए।

लेकिन जब मैं takeUntil का उपयोग कर पर्यवेक्षक को समाप्त कर सकता हूं, तो मुझे अन्यथा असीमित चलने वाली फ़ाइल वॉचर को समाप्त करने का एक साफ तरीका खोजने की आवश्यकता है। मैं अपना खुद का terminateWatcher विधि/फ़ील्ड लिख सकता हूं, लेकिन यह अवलोकन/पर्यवेक्षक encapsulation तोड़ता है - और मैं यथासंभव प्रतिक्रियाशील प्रतिमान के लिए सख्त रहना चाहता हूं।

public static void main(String... args) { 
    . . . 
    Observable<String> lines = Observable.create(createWatcher(file)); 
    lines = lines.takeWhile(new Func1<String, Boolean>() { 
     @Override 
     public Boolean call(String line) { 
      // Predicate for which to continue processing 
      return !line.contains("shutdown"); 
     } 
    }).subscribeOn(Schedulers.threadPoolForIO()) 
      .observeOn(Schedulers.currentThread()); 
    // Seems like I should use subscribeOn() and observeOn(), but they 
    // make my tailer terminate without reading any text. 

    Subscription subscription = lines.subscribe(new Action1<String>() { 
     @Override 
     public void call(String line) { 
      System.out.printf("%20s\t%s\n", file, line); 
     } 
    }); 
} 

मेरे दो प्रश्न हैं::

public class FileWatcher implements OnSubscribeFunc<String> { 
    private Path path = . . .; 

    @Override 
    // The <? super String> generic is pointless but required by the compiler 
    public Subscription onSubscribe(Observer<? super String> observer) { 
     try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { 
      String newLine = ""; 
      while (!Thread.interrupted()) { // How do I terminate this reactively? 
       if ((newLine = reader.readLine()) != null) 
        observer.onNext(newLine); 
       else 
        try { 
         // Wait for more text 
         Thread.sleep(250); 
        } catch (InterruptedException e) { 
         Thread.currentThread().interrupt(); 
        } 
      } 
      observer.onCompleted(); 
     } catch (Exception e) { 
      observer.onError(e); 
     } 

     return null; // Not sure what Subscription I should return 
    } 
} 

यहाँ के रूप में वे आ ऑब्जर्वर कोड है कि नई लाइनों प्रिंट है:

यहाँ Observable<String> कोड है

  1. क्या एक अन्यथा असीमित चलने वाली धारा को समाप्त करने के लिए एक प्रतिक्रियाशील-निरंतर तरीका है?
  2. मेरे कोड में अन्य गलतियों से आप रोते हैं? :)

उत्तर

6

आप जब सदस्यता अनुरोध किया जाता है देख रहा है फ़ाइल की शुरुआत कर रहे हैं के बाद से, यह जब सदस्यता समाप्त भावना इसे समाप्त करने के लिए बनाता है, जैसे कि, जब Subscription बंद कर दिया है। यह आपके कोड टिप्पणियों में ढीले सिरों में से एक को संबोधित करता है: आप Subscription लौटाते हैं जो फ़ाइल को देखने से रोकने के लिए FileWatcher बताता है। फिर अपनी लूप स्थिति को प्रतिस्थापित करें जैसे कि यह जांचता है कि क्या मौजूदा थ्रेड बाधित हो गया है या नहीं, यह जांचने के बजाय सदस्यता रद्द कर दी गई है या नहीं।

समस्या यह है कि यदि आपकी onSubscribe() विधि कभी वापस नहीं आती है तो यह बहुत उपयोगी नहीं है। शायद आपके FileWatcher को उस शेड्यूलर पर होने वाले पढ़ने के साथ, शेड्यूलर निर्दिष्ट करने की आवश्यकता होनी चाहिए।

+0

तो शेड्यूलर परिभाषित कर सकता है कि लॉग फ़ाइल कितनी बार पढ़ी जानी चाहिए; फिर, जब शेड्यूलर FileWatcher चलाता है, तो FileWatcher 'readLine() == null' तक लाइनों को खींच लेगा। फिर पर्यवेक्षक सामान्य की तरह सदस्यता रद्द कर सकते हैं, या स्वचालित रूप से सदस्यता समाप्त करने के लिए 'takeUntil()' का उपयोग कर सकते हैं। यह देखने के लिए कि क्या यह काम करता है, मुझे इन विधियों को देखना होगा, लेकिन मुझे विचार पसंद है। – MonkeyWithDarts

संबंधित मुद्दे