मेरे द्वारा बनाई गई एक और पोस्ट के समान, यह पोस्ट करता है और एक नया प्रश्न बनाता है।डीबी कनेक्शन बनाएं और एकाधिक प्रक्रियाओं (मल्टीप्रोसेसिंग) पर बनाए रखें
रिकैप: मुझे एक स्थानिक डेटाबेस में प्रत्येक रिकॉर्ड को अपडेट करने की आवश्यकता है जिसमें मेरे पास पॉइंटगोन के ओवरले डेटा सेट के बिंदुओं का डेटा सेट है। प्रत्येक बिंदु सुविधा के लिए मैं इसे बहुभुज सुविधा से जोड़ने के लिए एक कुंजी असाइन करना चाहता हूं जो यह भीतर है। तो यदि मेरा बिंदु 'न्यूयॉर्क शहर' बहुभुज संयुक्त राज्य अमेरिका के भीतर है और संयुक्त राज्य अमेरिका बहुभुज 'जीआईडी = 1' के लिए मैं अपने बिंदु न्यूयॉर्क शहर के लिए 'gid_fkey = 1' असाइन करूंगा।
ठीक है तो यह मल्टीप्रोसेसिंग का उपयोग करके हासिल किया गया है। मैंने इसका उपयोग करके गति में 150% की वृद्धि देखी है, इसलिए यह काम करता है। लेकिन मुझे लगता है कि अनावश्यक ओवरहेड का एक गुच्छा है क्योंकि प्रत्येक रिकॉर्ड के लिए एक डीबी कनेक्शन की आवश्यकता होती है।
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self):
pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
w.start()
pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()
pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]
pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()
cityIdListList = []
for x in cityIdListTuple:
cityIdList.append(x[0])
for i in xrange(num_jobs):
tasks.put(Task(cityIdList[i - 1]))
for i in xrange(num_consumers):
tasks.put(None)
while num_jobs:
result = results.get()
print result
num_jobs -= 1
यह कनेक्शन प्रति 0.3 और 1.5 सेकंड के बीच होने की के रूप में मैं 'समय' मॉड्यूल के साथ यह उपाय है दिखता है:
तो यहाँ कोड है।
क्या प्रति प्रक्रिया डीबी कनेक्शन बनाने का कोई तरीका है और फिर शहर_आईडी जानकारी को एक चर के रूप में उपयोग करें जिसे मैं इस खुले में कर्सर के लिए एक क्वेरी में फ़ीड कर सकता हूं? इस तरह मैं डीबी कनेक्शन के साथ प्रत्येक चार प्रक्रियाओं को कहता हूं और फिर किसी भी तरह से प्रक्रिया करने के लिए मुझे शहर_आईडी छोड़ देता हूं।
मेट है कि एक का इलाज काम किया। आपको स्वीकृति का निशान देने के लिए कुडोस नहीं हैं लेकिन वह कोड बिल्कुल जादू था। निरंतर डीबी कनेक्शन से छुटकारा पाने से गति 50% तक आसानी से बढ़ गई है। कुछ मामलों में शायद 100% के करीब। एक बार फिर धन्यवाद। –
@EnE_: मुझे खुशी है कि यह आपकी मदद करता है :)। आपको जवाब स्वीकार करना चाहिए, आपको यह करने का अधिकार है क्योंकि आप प्रश्न के स्वामी हैं। –
ठीक है, मुझे यह मानना है कि मैंने सोचा था कि मुझे टिक के बजाय ऊपर तीर दबाया जाना चाहिए। 'अनुमोदन की टिक' दुर्भाग्य से वाक्यांश की बारी की निंदा की गई थी = डी –