2016-04-13 19 views
8

क्या मैं एक अंतहीन स्ट्रीम सुनने के लिए रेट्रोफिट + आरएक्सजेवा का उपयोग कर सकता हूं? उदाहरण के लिए ट्विटर स्ट्रीम। मेरे पास यह है:एंड्रॉइड रेट्रोफिट 2 + आरएक्सजेवा: अंतहीन स्ट्रीम को सुनें

public interface MeetupAPI { 
    @GET("http://stream.meetup.com/2/rsvps/") 
    Observable<RSVP> getRSVPs(); 
} 

MeetupAPI api = new Retrofit.Builder() 
      .baseUrl(MeetupAPI.RSVP_API) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .build() 
      .create(MeetupAPI.class); 

api.getRSVPs() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(rsvp -> Log.d(TAG, "got rsvp"), 
       error -> Log.d(TAG, "error: " + error), 
       () -> Log.d(TAG, "onComplete")); 

लेकिन पहली वस्तु को पार्स किए जाने के बाद "ऑनकंपलेट" का आह्वान किया जाता है। क्या आगे नोटिस तक खुला रहने के लिए रेट्रोफिट को बताने का कोई तरीका है?

उत्तर

9

यहाँ मेरी समाधान:

आप @Streaming एनोटेशन का उपयोग कर सकते हैं: @Streaming साथ

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

हम ResponseBody से कच्चे इनपुट मिल सकती है।

यहाँ मेरी समारोह शरीर की घटनाओं के साथ लाइनों से विभाजित रैप करने के लिए:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

और उपयोग परिणाम:

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

शान से

रोक जब हम सदस्यता समाप्ति, पुराना वापस बंद के बारे में upd आगत प्रवाह। लेकिन इनपुटस्ट्रीम को स्वयं को इनपुटस्ट्रीम से बंद करना या नहीं, इसलिए एकमात्र तरीका - स्ट्रीम से पढ़ने का प्रयास करें - हमें Socket closed संदेश के साथ अपवाद मिलता है। हम समापन के रूप में इस अपवाद की व्याख्या कर सकते हैं:

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

और अगर कनेक्शन क्योंकि प्रतिक्रिया अंत बंद कर दिया है, हम किसी भी अपवाद नहीं है। यहां isCompleted इसके लिए जांचें। अगर मैं गलत हूं तो मुझे बताएं :)

+0

धन्यवाद! स्ट्रीम को सुनकर बंद करने के तरीके पर कोई संकेत? – ticofab

संबंधित मुद्दे