2012-11-19 34 views
32

पर लिखने के लिए मैं एक बड़ी संख्यात्मक समस्या को हल करने की कोशिश कर रहा हूं जिसमें बहुत से उपप्रवाह शामिल हैं, और मैं विभिन्न कोर पर विभिन्न स्वतंत्र उपप्रोबम्स को विभाजित करने के लिए पाइथन के मल्टीप्रोसेसिंग मॉड्यूल (विशेष रूप से Pool.map) का उपयोग कर रहा हूं। प्रत्येक सबप्रोबलेम में बहुत से सब-सबप्रोबलेम्स की गणना करना शामिल है, और मैं इन परिणामों को प्रभावी रूप से किसी फ़ाइल में संग्रहीत करके याद कर रहा हूं, अगर उन्हें किसी भी प्रक्रिया द्वारा गणना नहीं की गई है, अन्यथा गणना को छोड़ दें और केवल फाइल से परिणाम पढ़ें।पाइथन मल्टीप्रोसेसिंग सुरक्षित रूप से एक फ़ाइल

मुझे फाइलों के साथ समवर्ती समस्याएं हैं: विभिन्न प्रक्रियाएं कभी-कभी यह देखने के लिए जांच करती हैं कि उप-सबप्रोबैम की गणना क्यों की गई है (फ़ाइल को देखकर जहां परिणाम संग्रहीत किए जाएंगे), देखें कि यह नहीं है, गणना चलाएं, फिर परिणामों को एक ही फ़ाइल में एक ही समय में लिखने का प्रयास करें। मैं इस तरह टकराव लिखने से कैसे बचूं?

+3

चेक बाहर का उपयोग करने का प्रलेखन से एक उदाहरण प्रक्रियाओं। –

+11

आपके पास एकमात्र एकल प्रक्रिया लेखन परिणाम हो सकते हैं, कतार के साथ इनपुट के रूप में जो अन्य कार्यकर्ता प्रक्रियाओं द्वारा खिलाया जा सकता है। मेरा मानना ​​है कि सभी कार्यकर्ताओं को केवल पढ़ने के लिए सुरक्षित होना सुरक्षित होगा। – GP89

+0

मुझे यह उल्लेख करना चाहिए था कि, चीजों को और अधिक जटिल बनाने के लिए, मैं क्लस्टर पर एक ही समय में कई अलग-अलग बड़ी मुख्य समस्याएं चला रहा हूं, प्रत्येक लेखन के परिणाम उसी नेटवर्किंग फ़ाइल सिस्टम पर सब-सबप्रोबलेम्स के परिणामस्वरूप होते हैं। इस प्रकार मैं पूरी तरह से अलग मशीनों पर चल रही प्रक्रियाओं से टक्कर प्राप्त कर सकता हूं (इसलिए मुझे मल्टीप्रोसेसिंग जैसी चीजों का उपयोग करके समाधान नहीं लगता है। लॉक काम करेगा)। –

उत्तर

63

@ GP89 ने एक अच्छा समाधान का उल्लेख किया। लेखन कार्यों को एक समर्पित प्रक्रिया में भेजने के लिए एक कतार का उपयोग करें जिसमें फ़ाइल में एकमात्र लेखन पहुंच है। अन्य सभी श्रमिकों ने केवल पहुंच पढ़ी है। यह टकराव को खत्म कर देगा। यहाँ एक उदाहरण apply_async का उपयोग करता है, लेकिन यह नक्शे के साथ भी काम करेगा:

import multiprocessing as mp 
import time 

fn = 'c:/temp/temp.txt' 

def worker(arg, q): 
    '''stupidly simulates long running process''' 
    start = time.clock() 
    s = 'this is a test' 
    txt = s 
    for i in xrange(200000): 
     txt += s 
    done = time.clock() - start 
    with open(fn, 'rb') as f: 
     size = len(f.read()) 
    res = 'Process' + str(arg), str(size), done 
    q.put(res) 
    return res 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 

    f = open(fn, 'wb') 
    while 1: 
     m = q.get() 
     if m == 'kill': 
      f.write('killed') 
      break 
     f.write(str(m) + '\n') 
     f.flush() 
    f.close() 

def main(): 
    #must use Manager queue here, or will not work 
    manager = mp.Manager() 
    q = manager.Queue()  
    pool = mp.Pool(mp.cpu_count() + 2) 

    #put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    #fire off workers 
    jobs = [] 
    for i in range(80): 
     job = pool.apply_async(worker, (i, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 

if __name__ == "__main__": 
    main() 

अच्छी किस्मत,

माइक

+1

हे माइक, उत्तर के लिए धन्यवाद। मुझे लगता है कि यह प्रश्न के लिए काम करेगा क्योंकि मैंने इसे phrased किया है, लेकिन मुझे यकीन नहीं है कि अगर यह प्रश्न में टिप्पणियों में उल्लिखित पूर्ण समस्या का समाधान करेगा, विशेष रूप से मेरे पास नेटवर्क पर कई मशीनों में कितने मुख्य कार्यक्रम चल रहे हैं फाइल सिस्टम, जिनमें से सभी प्रक्रियाएं हो सकती हैं जो एक ही फाइल को लिखने का प्रयास करेंगी। (एफडब्ल्यूआईडब्ल्यू, मुझे थोड़ी देर पहले एक हैकी तरीके से मेरी निजी समस्या के आसपास मिल गया था, लेकिन अगर अन्य लोगों के समान मुद्दे हैं तो टिप्पणी कर रहा हूं।) –

+1

मैं वास्तव में कई बार इसे ऊपर उठाना चाहता हूं। यह मेरे लिए कई बार मददगार रहा है। आज एक बार और अधिक। – Eduardo

+0

धन्यवाद माइक - मैं एमपी क्यूईज़ का उपयोग करने के तरीके से संघर्ष कर रहा था। आपका उदाहरण इसे बहुत स्पष्ट और सीधा बनाता है। – Anurag

0

यह मेरे लिए लग रहा है कि आप अस्थायी रूप से अपने परिणाम को बचाने के लिए प्रबंधक का उपयोग करने की आवश्यकता है एक सूची में और फिर सूची से परिणामों को एक फ़ाइल में लिखें। साथ ही, उस ऑब्जेक्ट को पास करने के लिए स्टैम्पैप का उपयोग करें जिसे आप संसाधित करना चाहते हैं और प्रबंधित सूची। पहला कदम स्टैरैप को पारित करने के लिए पैरामीटर बनाना है, जिसमें प्रबंधित सूची शामिल है।

from multiprocessing import Manager 
from multiprocessing import Pool 
import pandas as pd``` 

def worker(row, param): 
    # do something here and then append it to row 
    x = param**2 
    row.append(x) 

if __name__ == '__main__': 
    pool_parameter = [] # list of objects to process 
    with Manager() as mgr: 
     row = mgr.list([]) 

     # build list of parameters to send to starmap 
     for param in pool_parameter: 
      params.append([row,param]) 

     with Pool() as p: 
      p.starmap(worker, params) 

इस बिंदु से आपको यह तय करने की आवश्यकता है कि आप सूची को कैसे संभालने जा रहे हैं। यदि आपके पास बहुत सी रैम है और एक विशाल डेटा सेट पांडा का उपयोग करके संयोजित करने के लिए स्वतंत्र महसूस करता है। फिर आप फ़ाइल को सीएसवी या अचार के रूप में आसानी से सहेज सकते हैं। [ `Multiprocessing.Lock`] (http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes) एकाधिक सिंक्रनाइज़ करने के लिए

 df = pd.concat(row, ignore_index=True) 

     df.to_pickle('data.pickle') 
     df.to_csv('data.csv') 
संबंधित मुद्दे