2015-01-25 9 views
7

मैं एपीआई द्वारा requests मॉड्यूल का उपयोग करके प्रदान की गई एक ईवेंट स्ट्रीम का उपभोग करने का प्रयास कर रहा हूं। मैंने बफरिंग समस्या की तरह दिखने में भाग लिया है: requests मॉड्यूल एक ईवेंट द्वारा अंतराल लगता है।पाइथन "अनुरोध" लाइब्रेरी के साथ http प्रतिक्रिया स्ट्रीमिंग पढ़ना

मैं कोड है कि कुछ इस तरह दिखता है:

r = requests.get('http://localhost:8080/api/v1beta1/watch/services', 
       stream=True) 

for line in r.iter_lines(): 
    print 'LINE:', line 

Kubernetes ईवेंट नोटिफिकेशन का उत्सर्जन करता है के रूप में, इस कोड को केवल प्रदर्शित करेगा पिछले घटना उत्सर्जित जब एक नई घटना में आता है, जो यह लगभग पूरी तरह से बेकार बना देता है कोड के लिए जो सेवा पर प्रतिक्रिया देने की आवश्यकता है, ईवेंट जोड़ें/हटाएं।

मैं एक उपप्रक्रिया में curl को उत्पन्न करने के बजाय requests पुस्तकालय का उपयोग करके इस समाधान कर लिया है:

p = subprocess.Popen(['curl', '-sfN', 
         'http://localhost:8080/api/watch/services'], 
        stdout=subprocess.PIPE, 
        bufsize=1) 

for line in iter(p.stdout.readline, b''): 
    print 'LINE:', line 

यह काम करता है, लेकिन कुछ लचीलापन की कीमत पर। क्या पर कोई तरीका है requests लाइब्रेरी के साथ इस बफरिंग समस्या से बचें?

उत्तर

5

यह व्यवहार विधि requests लाइब्रेरी में विधि के एक छोटी गाड़ी कार्यान्वयन के कारण है। chunk_size ब्लॉक iter_content इटरेटर का उपयोग कर डेटा का में प्रतिक्रिया सामग्री पर

iter_lines दोहराता। दूरस्थ सर्वर (जो आम तौर पर मामला है जब उत्पादन की अंतिम पंक्ति को पढ़ने हो जाएगा) से पढ़ने के लिए उपलब्ध डेटा की कम से कम chunk_size बाइट्स देखते हैं, तो पढ़ने आपरेशन तक डेटा की chunk_size बाइट्स उपलब्ध हैं रोकेंगे।

import os 


def iter_lines(fd, chunk_size=1024): 
    '''Iterates over the content of a file-like object line-by-line.''' 

    pending = None 

    while True: 
     chunk = os.read(fd.fileno(), chunk_size) 
     if not chunk: 
      break 

     if pending is not None: 
      chunk = pending + chunk 
      pending = None 

     lines = chunk.splitlines() 

     if lines and lines[-1]: 
      pending = lines.pop() 

     for line in lines: 
      yield line 

    if pending: 
     yield(pending) 

यह काम करता है क्योंकि os.read बल्कि एक बफर को भरने के लिए के लिए इंतज़ार कर की तुलना में कम से कम chunk_size बाइट्स डेटा की वापस आ जाएगी:

मैं अपने खुद के iter_lines दिनचर्या है कि सही ढंग से संचालित लिखा है।

+0

यह तर्क दिया जा सकता है कि कौन सा कार्यान्वयन सही है - यदि कोई डेटा उपलब्ध हो तो आपका कोई नकली "लॉजिकल लाइन ब्रेक" डालेगा। सही दृष्टिकोण डेटा के कुल आकार को जानने के लिए प्रतीत होता है (एक निर्दिष्ट करना टीसीपी संचार के लिए एक आवश्यकता है) और ज्ञात अंत में केवल आंशिक पढ़ने को नियोजित करता है। –

+0

मुझे नहीं लगता कि आप तर्क दे सकते हैं कि मौजूदा कार्यान्वयन सही है। मेरा कठोर परीक्षण नहीं हुआ है, लेकिन यह निश्चित रूप से बेहतर काम करता है। एक और सही कार्यान्वयन - आदर्श रूप से अपस्ट्रीम पैच के रूप में सबमिट किया गया - बहुत उपयोगी होगा। – larsks

+0

@ivan_pozdeev * "सही दृष्टिकोण डेटा के कुल आकार को जानने के लिए प्रतीत होता है (एक निर्दिष्ट करना टीसीपी संचार के लिए एक आवश्यकता है)" * - नहीं, टीसीपी एक * स्ट्रीम * है और इसमें अनंत लंबाई हो सकती है। मुझे यकीन नहीं है कि आपने यह कहां सुना है लेकिन यह मूल रूप से असत्य है। –

संबंधित मुद्दे