2013-10-24 4 views
10

मैं एक डेटासेट तक पहुंच साझा करने के लिए पायथन के multiprocessing.Manager का उपयोग कर रहा हूं कि एक प्रक्रिया उत्पन्न होगी और अन्य देखेंगे। हालांकि, मैं इस समस्या में भाग रहा हूं कि manager.dict() द्वारा लौटाया गया एक dict प्रॉक्सी iteritems() का समर्थन नहीं करता है।पायथन में एक dict प्रॉक्सी पर फिर से कैसे करें?

मैं items() से अधिक पुन: सक्रिय कर सकता हूं, लेकिन इसका मतलब है कि सभी वस्तुओं में एक नया टुपल बनाना, जो एक बड़ी संख्या है। क्या मध्यवर्ती सूची/ट्यूपल बनाने के बिना ऐसा करने का कोई तरीका है, इस प्रकार केवल अतिरिक्त स्मृति की निरंतर मात्रा का उपयोग कर रहा है?

नोट: यह ठीक है अगर समाधान की आवश्यकता है कि उत्पन्न प्रक्रिया पुनरावृत्ति के लिए रुकती है।

+0

क्या आपने 'SyncManager' का उपयोग करने के लिए विचार किया था और 'iteritems' के साथ अपनी खुद की प्रॉक्सी पंजीकृत की थी? – oleg

+1

@oleg आप आसानी से iteritems का पर्दाफाश नहीं कर सकते हैं, क्योंकि यह निर्देश देने वाले निर्देशक नहीं हैं। यही कारण है कि डिफ़ॉल्ट dict प्रॉक्सी इसका खुलासा नहीं करता है और इसलिए सवाल है। – otus

+0

मैंने "बस" खुलासा नहीं किया। :) क्या हम 'iteritems' का पर्दाफाश करने के लिए' IteratorProxy' का उपयोग कर सकते हैं? – oleg

उत्तर

2

आप अपनी मेमोरी पदचिह्न को कम करने के लिए keys() पर फिर से शुरू कर सकते हैं। आपको हटाए जाने वाले कुंजियों के खिलाफ सावधान रहना होगा।

अन्यथा, यहां दो अलग-अलग तरीकों से एक उदाहरण दिया गया है जो आपको एक ताना में वस्तुओं के माध्यम से फिर से शुरू करने देगा। इस उदाहरण में iteritems() विधि केवल उस प्रक्रिया से कार्य करती है जो प्रबंधक ऑब्जेक्ट और प्रबंधक प्रक्रिया बनाता है जो कि बाल ऑब्जेक्ट बनाता है। ऐसा इसलिए है क्योंकि नए प्रॉक्सी बनाने के लिए प्रबंधक ऑब्जेक्ट की आवश्यकता होती है, और अन्य प्रक्रियाओं तक इसका उपयोग नहीं होता है। iteritems2() विधि अन्य प्रक्रियाओं से काम करती है, क्योंकि यह उन प्रक्रियाओं में एक नई प्रॉक्सी बनाने पर भरोसा नहीं करती है।

import multiprocessing as mp 
import multiprocessing.managers 

class mydict(dict): 
    def __init__(self, *args, **kwargs): 
     dict.__init__(self, *args, **kwargs) 
     self.iters = {} 

    def iteritems(self): 
     print "iteritems", mp.current_process() 
     return dict.iteritems(self) 

    def _iteritems_start(self): 
     print "_iteritems_start", mp.current_process() 
     i = dict.iteritems(self) 
     self.iters[id(i)] = i 
     return id(i) 

    def _iteritems_next(self, iter_id): 
     try: 
      return self.iters[iter_id].next() 
     except StopIteration: 
      del self.iters[iter_id] 
      return None 

class mydict_proxy(mp.managers.DictProxy): 
    def iteritems(self): 
     print "iteritems proxy", mp.current_process() 
     return self._callmethod("iteritems") 

    def iteritems2(self): 
     print "iteritems2 proxy", mp.current_process() 
     iter_id = self._callmethod("_iteritems_start") 
     def generator(): 
      while True: 
       a = self._callmethod("_iteritems_next", 
          (iter_id,)) 
       if a == None: 
        return 
       yield a 
     return generator() 

    _method_to_typeid_ = { "iteritems": "Iterator" } 
    _exposed_ = mp.managers.DictProxy._exposed_ 
    _exposed_ += ("iteritems", "_iteritems_start", "_iteritems_next") 

class mymanager(mp.managers.BaseManager): 
    pass 
mymanager.register("mydict", mydict, mydict_proxy) 
mymanager.register("Iterator", proxytype = mp.managers.IteratorProxy, 
      create_method = False) 

def other(d): 
    for k, v in d.iteritems2(): 
     d[k] = v.lower() 
    for k, v in d.iteritems(): 
     d[k] = ord(v) 

def main(): 
    manager = mymanager() 
    manager.start() 
    d = manager.mydict(list(enumerate("ABCDEFGHIJKLMNOP"))) 
    for (k, v) in d.iteritems(): 
     print k, v 
    proc = mp.Process(target = other, args = (d,)) 
    proc.start() 
    proc.join() 
    for (k, v) in d.iteritems(): 
     print k, v 

if __name__ == "__main__": 
    main() 

ध्यान दें कि जब इस कोड को अधिक स्मृति कुशल हो सकता है, यह शायद बहुत धीमी की एक बिल्ली हो जा रहा है।

-2

iteritems() सूची dict के लिए है। आप लूप के लिए उपयोग कर सकते हैं। या आप sorted() कह सकते हैं जो एक क्रमबद्ध सूची में कुंजी वापस कर देगा और फिर उस सूची में पुन: सक्रिय होगा और dict[key] करें। उम्मीद है की वो मदद करदे। यदि कोई बेहतर तरीका है। मेरे साथ साझा करो। मैं जानना मर रहा हूँ।

0

आप अपने खुद के प्रकार पंजीकृत करने के लिए SyncManager कक्षा का उपयोग कर सकते हैं। फिर आप उस प्रकार के तरीकों को लागू कर सकते हैं, उदा। एक ताना से केवल सीमित संख्या में आइटम प्राप्त करने के लिए।

import multiprocessing 
from multiprocessing import managers 


class TakerDict(dict): 
    """Like a dict, but allows taking a limited number of items.""" 

    def take(self, items=1): 
     """Take the first `items` items.""" 
     return [item for _, item in zip(range(items), self.items())] 


# NOTE: add other dict methods to the tuple if you need them. 
TakerProxy = managers.MakeProxyType('TakerProxy', ('take',)) 

managers.SyncManager.register('taker', TakerDict, TakerProxy) 


if __name__ == '__main__': 
    manager = multiprocessing.Manager() 
    taker = manager.taker() 
    # in other processes, use e.g. taker.take(5) 

इस प्रकार, स्मृति के उपयोग को सीमित करने के लिए, आप प्रबंधक प्रक्रिया बार-बार फोन करने के लिए तत्वों के अगले बैच प्राप्त करना होगा:

यहाँ प्राप्त करने के लिए एक उदाहरण आप शुरू कर दिया।

ऐसा करने के लिए, हालांकि, आपके निर्देश को अनुक्रमण का समर्थन करना होगा (ताकि आप एक विशिष्ट ऑफसेट से फिर से शुरू कर सकें)। चूंकि आपके पास किसी दस्तावेज़ में तत्वों के अंतर्निहित क्रम तक पहुंच नहीं है, इसलिए आप इसके बजाय सूची का उपयोग करना बेहतर कर देंगे (उदा। manager.list())। फिर अपने उपप्रमुखों में, सूची के len() के लिए पूछें, और उपयुक्त आकार के बैच प्राप्त करने के लिए एक टुकड़ा द्वारा इंडेक्स - इसके लिए आपको किसी भी प्रॉक्सी प्रकार को पंजीकृत करने की आवश्यकता नहीं है।

+2

क्या आप मूल रूप से प्रश्न में उल्लिखित "सूची में परिवर्तित करें" वर्कअराउंड को लागू नहीं कर रहे हैं, लेकिन कुछ जटिल तरीके से?यह वास्तव में समस्या को हल नहीं करता है (स्मृति की उपयोग की आवश्यकता से भी स्मृति उपयोग)। – otus

+0

ठीक है, यह डेटा को अंत में सूची में परिवर्तित करता है, इसलिए यह मेमोरी ओवरहेड के साथ आता है। यह सिर्फ भाग में ऐसा करता है ताकि आपको इतना ऊपरी भाग न मिले। मुझे नहीं लगता * यह 'IteratorProxy' दृष्टिकोण से भी बदतर होगा, लेकिन मैंने कुछ भी माप नहीं लिया है। –

+0

इसके अलावा यह वास्तव में भाग नहीं करता है: * "ऐसा करने के लिए, हालांकि, आपके निर्देश को अनुक्रमण का समर्थन करना होगा (इसलिए आप एक विशिष्ट ऑफ़सेट से फिर से शुरू कर सकते हैं)।" * – otus

संबंधित मुद्दे