2011-09-26 18 views
18

मेरे द्वारा बनाई गई एक और पोस्ट के समान, यह पोस्ट करता है और एक नया प्रश्न बनाता है।डीबी कनेक्शन बनाएं और एकाधिक प्रक्रियाओं (मल्टीप्रोसेसिंग) पर बनाए रखें

रिकैप: मुझे एक स्थानिक डेटाबेस में प्रत्येक रिकॉर्ड को अपडेट करने की आवश्यकता है जिसमें मेरे पास पॉइंटगोन के ओवरले डेटा सेट के बिंदुओं का डेटा सेट है। प्रत्येक बिंदु सुविधा के लिए मैं इसे बहुभुज सुविधा से जोड़ने के लिए एक कुंजी असाइन करना चाहता हूं जो यह भीतर है। तो यदि मेरा बिंदु 'न्यूयॉर्क शहर' बहुभुज संयुक्त राज्य अमेरिका के भीतर है और संयुक्त राज्य अमेरिका बहुभुज 'जीआईडी ​​= 1' के लिए मैं अपने बिंदु न्यूयॉर्क शहर के लिए 'gid_fkey = 1' असाइन करूंगा।

ठीक है तो यह मल्टीप्रोसेसिंग का उपयोग करके हासिल किया गया है। मैंने इसका उपयोग करके गति में 150% की वृद्धि देखी है, इसलिए यह काम करता है। लेकिन मुझे लगता है कि अनावश्यक ओवरहेड का एक गुच्छा है क्योंकि प्रत्येक रिकॉर्ड के लिए एक डीबी कनेक्शन की आवश्यकता होती है।

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task() 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self):   
     pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     pyConn.set_isolation_level(0) 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
    for w in consumers: 
     w.start() 

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConnX.set_isolation_level(0) 
    pyCursorX = pyConnX.cursor() 

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
    temp = pyCursorX.fetchall()  
    num_job = temp[0] 
    num_jobs = num_job[0] 

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
    cityIdListTuple = pyCursorX.fetchall()  

    cityIdListList = [] 

    for x in cityIdListTuple: 
     cityIdList.append(x[0]) 


    for i in xrange(num_jobs): 
     tasks.put(Task(cityIdList[i - 1])) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    while num_jobs: 
     result = results.get() 
     print result 
     num_jobs -= 1 

यह कनेक्शन प्रति 0.3 और 1.5 सेकंड के बीच होने की के रूप में मैं 'समय' मॉड्यूल के साथ यह उपाय है दिखता है:

तो यहाँ कोड है।

क्या प्रति प्रक्रिया डीबी कनेक्शन बनाने का कोई तरीका है और फिर शहर_आईडी जानकारी को एक चर के रूप में उपयोग करें जिसे मैं इस खुले में कर्सर के लिए एक क्वेरी में फ़ीड कर सकता हूं? इस तरह मैं डीबी कनेक्शन के साथ प्रत्येक चार प्रक्रियाओं को कहता हूं और फिर किसी भी तरह से प्रक्रिया करने के लिए मुझे शहर_आईडी छोड़ देता हूं।

उत्तर

31

उपभोक्ता निर्माता में अपने कनेक्शन के निर्माण को अलग करने का प्रयास करें, तो यह निष्पादित कार्य के लिए दे:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     self.pyConn.set_isolation_level(0) 


    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task(connection=self.pyConn) 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self, connection=None):   
     pyConn = connection 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 
+1

मेट है कि एक का इलाज काम किया। आपको स्वीकृति का निशान देने के लिए कुडोस नहीं हैं लेकिन वह कोड बिल्कुल जादू था। निरंतर डीबी कनेक्शन से छुटकारा पाने से गति 50% तक आसानी से बढ़ गई है। कुछ मामलों में शायद 100% के करीब। एक बार फिर धन्यवाद। –

+0

@EnE_: मुझे खुशी है कि यह आपकी मदद करता है :)। आपको जवाब स्वीकार करना चाहिए, आपको यह करने का अधिकार है क्योंकि आप प्रश्न के स्वामी हैं। –

+0

ठीक है, मुझे यह मानना ​​है कि मैंने सोचा था कि मुझे टिक के बजाय ऊपर तीर दबाया जाना चाहिए। 'अनुमोदन की टिक' दुर्भाग्य से वाक्यांश की बारी की निंदा की गई थी = डी –

संबंधित मुद्दे