मैं प्रतिक्रियाशील प्रोग्रामिंग और आरएक्सजेवा की खोज कर रहा हूं। यह मजेदार है, लेकिन मैं एक समस्या पर फंस गया हूं जिसके लिए मुझे कोई जवाब नहीं मिल रहा है। मेरा मूल प्रश्न: अन्यथा अनगिनत चलने वाले पर्यवेक्षण को समाप्त करने के लिए एक प्रतिक्रियाशील-उपयुक्त तरीका क्या है? मैं आलोचकों का स्वागत करता हूं और मेरे कोड के संबंध में प्रतिक्रियाशील सर्वोत्तम प्रथाओं का भी स्वागत करता हूं।आरएक्सजेवा - अनंत स्ट्रीम को समाप्त करना
एक अभ्यास के रूप में, मैं एक लॉग फ़ाइल पूंछ उपयोगिता लिख रहा हूं। लॉग फ़ाइल में लाइनों की धारा Observable<String>
द्वारा दर्शायी जाती है। फ़ाइल में जोड़े गए पाठ को पढ़ने के लिए BufferedReader
प्राप्त करने के लिए, मैं सामान्य reader.readLine() == null
समाप्ति जांच को अनदेखा करता हूं और इसके बजाय इसका अर्थ यह समझने के लिए करता हूं कि मेरा धागा सोना चाहिए और अधिक लॉगर टेक्स्ट की प्रतीक्षा करनी चाहिए।
लेकिन जब मैं takeUntil
का उपयोग कर पर्यवेक्षक को समाप्त कर सकता हूं, तो मुझे अन्यथा असीमित चलने वाली फ़ाइल वॉचर को समाप्त करने का एक साफ तरीका खोजने की आवश्यकता है। मैं अपना खुद का terminateWatcher
विधि/फ़ील्ड लिख सकता हूं, लेकिन यह अवलोकन/पर्यवेक्षक encapsulation तोड़ता है - और मैं यथासंभव प्रतिक्रियाशील प्रतिमान के लिए सख्त रहना चाहता हूं।
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
मेरे दो प्रश्न हैं::
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
यहाँ के रूप में वे आ ऑब्जर्वर कोड है कि नई लाइनों प्रिंट है:
यहाँ Observable<String>
कोड है
- क्या एक अन्यथा असीमित चलने वाली धारा को समाप्त करने के लिए एक प्रतिक्रियाशील-निरंतर तरीका है?
- मेरे कोड में अन्य गलतियों से आप रोते हैं? :)
तो शेड्यूलर परिभाषित कर सकता है कि लॉग फ़ाइल कितनी बार पढ़ी जानी चाहिए; फिर, जब शेड्यूलर FileWatcher चलाता है, तो FileWatcher 'readLine() == null' तक लाइनों को खींच लेगा। फिर पर्यवेक्षक सामान्य की तरह सदस्यता रद्द कर सकते हैं, या स्वचालित रूप से सदस्यता समाप्त करने के लिए 'takeUntil()' का उपयोग कर सकते हैं। यह देखने के लिए कि क्या यह काम करता है, मुझे इन विधियों को देखना होगा, लेकिन मुझे विचार पसंद है। – MonkeyWithDarts