2015-02-06 7 views
8

मेरे पास एक ऐसा फ़ंक्शन है जो कुछ सिमुलेशन करता है और स्ट्रिंग प्रारूप में एक सरणी देता है।पायथन मल्टीप्रोसेसिंग - पूल.मैप ऑपरेशन की प्रक्रिया को ट्रैक करना

मैं के लिए सिमुलेशन (फ़ंक्शन) को 10000 संभावित इनपुट मानों, से अधिक इनपुट पैरामीटर मानों को चलाने और एक फ़ाइल में परिणाम लिखना चाहता हूं।

मैं समानांतर में सिमुलेशन चलाने के लिए मल्टीप्रोसेसिंग, विशेष रूप से, पूल.मैप फ़ंक्शन का उपयोग कर रहा हूं।

10000 बार से अधिक सिमुलेशन फ़ंक्शन चलाने की पूरी प्रक्रिया में बहुत लंबा समय लगता है, मैं वास्तव में पूरे ऑपरेशन की प्रक्रिया को ट्रैक करना चाहता हूं।

मुझे लगता है कि नीचे मेरे वर्तमान कोड में समस्या यह है कि, पूल.मैप उन परिचालनों के दौरान किसी भी प्रक्रिया ट्रैकिंग के बिना 10000 बार फ़ंक्शन चलाता है। एक बार समांतर प्रसंस्करण 10000 सिमुलेशन (दिन के लिए घंटों तक हो सकती है) चलने के बाद खत्म हो जाती है, तो जब मैं 10000 सिमुलेशन परिणाम फ़ाइल में सहेजा जा रहा हूं तो मैं ट्रैकिंग करता हूं .. इसलिए यह वास्तव में पूल.मैप ऑपरेशन की प्रसंस्करण को ट्रैक नहीं कर रहा है।

क्या मेरे कोड में कोई आसान फिक्स है जो प्रक्रिया ट्रैकिंग की अनुमति देगा?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

उत्तर

7

यदि आप एक पुनरावृत्त map फ़ंक्शन का उपयोग करते हैं, तो प्रगति का ट्रैक रखना बहुत आसान है।

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> def simFunction(x,y): 
... import time 
... time.sleep(2) 
... return x**2 + y 
... 
>>> x,y = range(100),range(-100,100,2) 
>>> res = Pool().imap(simFunction, x,y) 
>>> with open('results.txt', 'w') as out: 
... for i in x: 
...  out.write("%s\n" % res.next()) 
...  if i%10 is 0: 
...  print "%s of %s simulated" % (i, len(x)) 
... 
0 of 100 simulated 
10 of 100 simulated 
20 of 100 simulated 
30 of 100 simulated 
40 of 100 simulated 
50 of 100 simulated 
60 of 100 simulated 
70 of 100 simulated 
80 of 100 simulated 
90 of 100 simulated 

या, आप एसिंक्रोनस map का उपयोग कर सकते हैं। यहां मैं इसे थोड़ा मिश्रण करने के लिए बस थोड़ा अलग कर दूंगा।

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

ध्यान दें कि मैं pathos.multiprocessing बजाय multiprocessing उपयोग कर रहा हूँ। यह सिर्फ multiprocessing का कांटा है जो आपको कई इनपुट के साथ map फ़ंक्शंस करने में सक्षम बनाता है, इसमें बहुत बेहतर क्रमबद्धता है, और आपको कहीं भी map कॉल निष्पादित करने की अनुमति देता है (न केवल __main__ में)। उपर्युक्त करने के लिए आप multiprocessing का उपयोग भी कर सकते हैं, हालांकि कोड बहुत अलग होगा।

या तो एक पुनरावृत्त या असीमित map आपको जो भी कोड बेहतर प्रक्रिया ट्रैकिंग करना चाहते हैं उसे लिखने में सक्षम करेगा।उदाहरण के लिए, प्रत्येक नौकरी के लिए एक अद्वितीय "आईडी" पास करें, और देखें कि कौन सा वापस आ गया है, या प्रत्येक नौकरी अपनी प्रक्रिया आईडी वापस कर देती है। प्रगति और प्रक्रियाओं को ट्रैक करने के कई तरीके हैं ... लेकिन उपरोक्त आपको एक शुरुआत देनी चाहिए।

आप pathos यहाँ प्राप्त कर सकते: https://github.com/uqfoundation

+0

आपको बहुत बहुत धन्यवाद! – user32147

3

कोई "आसान समाधान" नहीं है। map आपके द्वारा कार्यान्वयन विवरण छुपाए जाने के बारे में है। और इस मामले में आप विवरण चाहते हैं। यही है, परिभाषा के अनुसार चीजें थोड़ा अधिक जटिल हो जाती हैं। आपको संचार प्रतिमान बदलने की जरूरत है। ऐसा करने के कई तरीके हैं।

एक है: अपने परिणामों को इकट्ठा करने के लिए एक कतार बनाएं, और अपने श्रमिकों को इस कतार में परिणाम दें। फिर आप एक निगरानी थ्रेड या प्रक्रिया के भीतर से, कतार को देख सकते हैं, और परिणामों को उपभोग कर सकते हैं जैसे वे आ रहे हैं। उपभोग करते समय, आप उनका विश्लेषण कर सकते हैं, और लॉग आउटपुट उत्पन्न कर सकते हैं। प्रगति का ट्रैक रखने का यह सबसे सामान्य तरीका हो सकता है: आप वास्तविक समय में किसी भी तरह से आने वाले परिणामों का जवाब दे सकते हैं।

आपके कार्यकर्ता फ़ंक्शन को थोड़ा संशोधित करने और वहां लॉग आउट उत्पन्न करने का एक और आसान तरीका हो सकता है। बाहरी उपकरणों (जैसे grep और wc) के साथ लॉग आउटपुट का ध्यानपूर्वक विश्लेषण करके, आप ट्रैक रखने के बहुत सरल माध्यमों के साथ आ सकते हैं।

+1

धन्यवाद। क्या आप कृपया कुछ सरल उदाहरण प्रदान कर सकते हैं? – user32147

3

मुझे लगता है कि तुम क्या जरूरत है एक लॉग फ़ाइल है।

मैं आपको मॉड्यूल लॉगिंग करने की सलाह दूंगा जो पाइथन मानक लाइब्रेरी का हिस्सा है। लेकिन दुर्भाग्य से लॉगिंग मल्टीप्रोसेसिंग-सुरक्षित नहीं है। तो आप इसे अपने ऐप में आउट-ऑफ-द-बॉक्स का उपयोग नहीं कर सकते हैं।

तो, आपको लॉगिंग मॉड्यूल के साथ एक क्यूई या ताले का उपयोग करके एक मल्टीप्रोसेसिंग-सुरक्षित लॉग हैंडलर का उपयोग करने या लागू करने की आवश्यकता होगी।

स्टैक ओवरफ्लो में इसके बारे में बहुत सी चर्चा है। इस उदाहरण के लिए:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

आप कर सकते हैं: How should I log while using multiprocessing in Python?

तो सीपीयू लोड के सबसे अनुकरण समारोह में है और आप लॉग रोटेशन का उपयोग करने के लिए नहीं जा रहे हैं, तो आप शायद इस तरह एक साधारण ताला तंत्र का उपयोग कर सकते चलते समय आपकी लॉग फ़ाइल "tail -f"। यह आपको देखना चाहिए:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

विंडोज और लिनक्स पर कोशिश की गई।

आशा है कि इससे

+0

'multiprocessing.get_logger()' ताले से संरक्षित एक फीचर-सीमित लॉगर देता है, https://docs.python.org/2/library/multiprocessing.html#logging –

+0

हां, लेकिन यह मॉड्यूल लॉगर है ... इसलिए आपने इसका उपयोग कर सकते हैं, आपका लॉग मॉड्यूल स्तर के संदेशों के साथ मिश्रित होगा: इसे आज़माएं और आपको इस तरह के संदेश दिखाई देंगे: 2015-02-08 23: 47: 10,954 9288 DEBUG ने हैंडल 448 –

+0

ओह, ठीक है, मैंने इसे वास्तव में कभी भी इस्तेमाल नहीं किया और दस्तावेज़ों के माध्यम से बहुत जल्दी स्किम किया। –

संबंधित मुद्दे