2016-09-07 15 views
13

फ़ंक्शन glib.spawn_async आपको तीन कॉलबैक को हुक करने की अनुमति देता है जिसे stdout, stderr पर और ईवेंट पूर्ण होने पर ईवेंट पर बुलाया जाता है।popen के साथ glib.spawn_async की नकल ...

मैं subprocess के साथ या तो थ्रेड या असिनियो के साथ समान कार्यक्षमता की नकल कैसे कर सकता हूं?

मुझे थ्रेडिंग/एसिनीओ की बजाय कार्यक्षमता में अधिक दिलचस्पी है, लेकिन एक उत्तर जिसमें दोनों शामिल हैं, एक बक्षीस कमाएंगे।

import glib 
import logging 
import os 
import gtk 


class MySpawn(object): 
    def __init__(self): 
     self._logger = logging.getLogger(self.__class__.__name__) 

    def execute(self, cmd, on_done, on_stdout, on_stderr): 
     self.pid, self.idin, self.idout, self.iderr = \ 
      glib.spawn_async(cmd, 
          flags=glib.SPAWN_DO_NOT_REAP_CHILD, 
          standard_output=True, 
          standard_error=True) 
     fout = os.fdopen(self.idout, "r") 
     ferr = os.fdopen(self.iderr, "r") 
     glib.child_watch_add(self.pid, on_done) 
     glib.io_add_watch(fout, glib.IO_IN, on_stdout) 
     glib.io_add_watch(ferr, glib.IO_IN, on_stderr) 
     return self.pid 


if __name__ == '__main__': 
    logging.basicConfig(format='%(thread)d %(levelname)s: %(message)s', 
         level=logging.DEBUG) 
    cmd = '/usr/bin/git ls-remote https://github.com/DiffSK/configobj'.split() 

    def on_done(pid, retval, *args): 
     logging.info("That's all folks!…") 

    def on_stdout(fobj, cond): 
     """This blocks which is fine for this toy example…""" 
     for line in fobj.readlines(): 
      logging.info(line.strip()) 
     return True 

    def on_stderr(fobj, cond): 
     """This blocks which is fine for this toy example…""" 
     for line in fobj.readlines(): 
      logging.error(line.strip()) 
     return True 

    runner = MySpawn() 
    runner.execute(cmd, on_done, on_stdout, on_stderr) 
    try: 
     gtk.main() 
    except KeyboardInterrupt: 
     print('') 

मैं जोड़ने चाहिए कि readlines() के बाद से ब्लॉक कर रहा है, सब से ऊपर उत्पादन बफ़र और यह एक ही बार में भेज देंगे:

यहाँ एक खिलौना कार्यक्रम से पता चलता है कि मुझे क्या करना चाहते हैं। यदि यह कोई नहीं चाहता है, तो आपको readline() का उपयोग करना होगा और यह सुनिश्चित करना होगा कि कमांड के अंत में आप उन सभी लाइनों को पढ़ना समाप्त कर दें जिन्हें आपने पहले नहीं पढ़ा था।

import asyncio 

class Handler(asyncio.SubprocessProtocol): 
    def pipe_data_received(self, fd, data): 
     # fd == 1 for stdout, and 2 for stderr 
     print("Data from /bin/ls on fd %d: %s" % (fd, data.decode())) 

    def pipe_connection_lost(self, fd, exc): 
     print("Connection lost to /bin/ls") 

    def process_exited(self): 
     print("/bin/ls is finished.") 

loop = asyncio.get_event_loop() 
coro = loop.subprocess_exec(Handler, "/bin/ls", "/") 

loop.run_until_complete(coro) 
loop.close() 
उपप्रक्रिया और सूत्रण के साथ

, यह रूप में अच्छी तरह सरल है:

उत्तर

4

asyncio subprocess_exec है, सभी एक ही उपप्रक्रिया मॉड्यूल का उपयोग करने के लिए कोई जरूरत नहीं है। तुम बस प्रक्रिया के लिए wait() को पाइप प्रति एक धागा है, और एक अंडे कर सकते हैं:

import subprocess 
import threading 

class PopenWrapper(object): 
    def __init__(self, args): 
     self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL) 

     self.stdout_reader_thread = threading.Thread(target=self._reader, args=(self.process.stdout,)) 
     self.stderr_reader_thread = threading.Thread(target=self._reader, args=(self.process.stderr,)) 
     self.exit_watcher = threading.Thread(target=self._exit_watcher) 

     self.stdout_reader_thread.start() 
     self.stderr_reader_thread.start() 
     self.exit_watcher.start() 

    def _reader(self, fileobj): 
     for line in fileobj: 
      self.on_data(fileobj, line) 

    def _exit_watcher(self): 
     self.process.wait() 
     self.stdout_reader_thread.join() 
     self.stderr_reader_thread.join() 
     self.on_exit() 

    def on_data(self, fd, data): 
     return NotImplementedError 

    def on_exit(self): 
     return NotImplementedError 

    def join(self): 
     self.process.wait() 

class LsWrapper(PopenWrapper): 
    def on_data(self, fd, data): 
     print("Received on fd %r: %s" % (fd, data)) 

    def on_exit(self): 
     print("Process exited.") 


LsWrapper(["/bin/ls", "/"]).join() 

हालांकि, ध्यान रखें कि GLib asynchroneously अपने कॉलबैक निष्पादित करने के लिए नहीं उपयोग धागे करता है। यह एक घटना लूप का उपयोग करता है, जैसा कि एसिन्सियो करता है। विचार यह है कि आपके प्रोग्राम के मूल में एक लूप है जो कुछ होने तक प्रतीक्षा करता है, और फिर सिंक्रनाइज़ रूप से संबंधित कॉलबैक निष्पादित करता है। आपके मामले में, यह "पाइपों में से एक पर पढ़ने के लिए डेटा उपलब्ध हो जाता है", और "उपप्रोसेस निकल गया है"। आम तौर पर, यह "एक्स 11-सर्वर ने माउस आंदोलन की सूचना दी", "आने वाले नेटवर्क यातायात" जैसी चीजें भी शामिल हैं। आप अपने स्वयं के ईवेंट लूप लिखकर ग्लिब के व्यवहार का अनुकरण कर सकते हैं। दो पाइपों पर select module का उपयोग करें। यदि रिपोर्टों का चयन करें कि पाइप पठनीय हैं, लेकिन read कोई डेटा नहीं लौटाता है, तो प्रक्रिया संभवतः बाहर निकलती है - इस मामले में सबप्रोसेस ऑब्जेक्ट पर poll() विधि को कॉल करने के लिए यह जांचने के लिए कॉल करें कि यह पूरा हो गया है या नहीं, और यदि आपके पास है, तो आपकी निकास कॉलबैक या त्रुटि इसी प्रकार कॉलबैक।

+0

इस उत्तर को लिखने के लिए समय निकालने के लिए बहुत बहुत धन्यवाद। – Sardathrion

+1

ध्यान दें कि उपर्युक्त 'stdout' और 'stderr' में पंक्तियों को बफर करेगा क्योंकि' रीडलाइन()' अवरुद्ध है। यदि आप एक अपडेट के रूप में अपडेट करना चाहते हैं, तो 'read() 'का उपयोग करें, लेकिन यह सुनिश्चित करें कि पाठक धागे खत्म होने पर आप बफर खाली कर लें। – Sardathrion

संबंधित मुद्दे