2015-05-08 5 views
5

तो मैं पाइथन में कुछ इंटरनेट कनेक्शन बहु-थ्रेड करने की कोशिश कर रहा हूं। मैं मल्टीप्रोसेसिंग मॉड्यूल का उपयोग कर रहा हूं इसलिए मैं "ग्लोबल इंटरप्रेटर लॉक" के आसपास जा सकता हूं। लेकिन ऐसा लगता है कि सिस्टम केवल पायथन के लिए एक खुला कनेक्शन पोर्ट देता है, या कम से कम यह केवल एक कनेक्शन को एक बार होने की अनुमति देता है। मैं जो कह रहा हूं उसका एक उदाहरण यहां दिया गया है।लिनक्स कितने नेटवर्क पोर्ट्स को पाइथन का उपयोग करने की अनुमति देता है?

* यदि आप इस आप देखेंगे कि यह समारोह पर 50 उदाहरणों शुरू होता है, लेकिन केवल एक समय में एक से चलाता चलाने है कि यह एक Linux सर्वर

from multiprocessing import Process, Queue 
import urllib 
import random 

# Generate 10,000 random urls to test and put them in the queue 
queue = Queue() 
for each in range(10000): 
    rand_num = random.randint(1000,10000) 
    url = ('http://www.' + str(rand_num) + '.com') 
    queue.put(url) 

# Main funtion for checking to see if generated url is active 
def check(q): 
    while True: 
     try: 
      url = q.get(False) 
      try: 
       request = urllib.urlopen(url) 
       del request 
       print url + ' is an active url!' 
      except: 
       print url + ' is not an active url!' 
     except: 
      if q.empty(): 
       break 

# Then start all the threads (50) 
for thread in range(50): 
    task = Process(target=check, args=(queue,)) 
    task.start() 

तो पर चल रहा है ध्यान दें। आप सोच सकते हैं कि 'ग्लोबल इंटरप्रेटर लॉक' ऐसा कर रहा है लेकिन यह नहीं है। नेटवर्क अनुरोध के बजाय फ़ंक्शन को गणितीय फ़ंक्शन में बदलने का प्रयास करें और आप देखेंगे कि सभी पचास धागे एक साथ चलते हैं।

तो क्या मुझे सॉकेट के साथ काम करना होगा? या क्या ऐसा कुछ है जो मैं कर सकता हूं जो अधिक बंदरगाहों तक पाइथन पहुंच देगा? या क्या मैं कुछ नहीं देख रहा हूं? आप क्या सोचते हैं मुझे बताओ! धन्यवाद!

* संपादित

तो मैं चीजों को अनुरोध पुस्तकालय के साथ बेहतर परीक्षण करने के लिए इस पटकथा लिखी। ऐसा लगता है कि मैंने इससे पहले इसका परीक्षण नहीं किया था।

from multiprocessing import Process, Queue 
from threading import Thread 
from Queue import Queue as Q 
import requests 
import time 

# A main timestamp 
main_time = time.time() 

# Generate 100 urls to test and put them in the queue 
queue = Queue() 
for each in range(100): 
    url = ('http://www.' + str(each) + '.com') 
    queue.put(url) 

# Timer queue 
time_queue = Queue() 

# Main funtion for checking to see if generated url is active 
def check(q, t_q): # args are queue and time_queue 
    while True: 
     try: 
      url = q.get(False) 
      # Make a timestamp 
      t = time.time() 
      try: 
       request = requests.head(url, timeout=5) 
       t = time.time() - t 
       t_q.put(t) 
       del request 
      except: 
       t = time.time() - t 
       t_q.put(t) 
     except: 
      break 

# Then start all the threads (20) 
thread_list = [] 
for thread in range(20): 
    task = Process(target=check, args=(queue, time_queue)) 
    task.start() 
    thread_list.append(task) 

# Join all the threads so the main process don't quit 
for each in thread_list: 
    each.join() 
main_time_end = time.time() 

# Put the timerQueue into a list to get the average 
time_queue_list = [] 
while True: 
    try: 
     time_queue_list.append(time_queue.get(False)) 
    except: 
     break 

# Results of the time 
average_response = sum(time_queue_list)/float(len(time_queue_list)) 
total_time = main_time_end - main_time 
line = "Multiprocessing: Average response time: %s sec. -- Total time: %s sec." % (average_response, total_time) 
print line 

# A main timestamp 
main_time = time.time() 

# Generate 100 urls to test and put them in the queue 
queue = Q() 
for each in range(100): 
    url = ('http://www.' + str(each) + '.com') 
    queue.put(url) 

# Timer queue 
time_queue = Queue() 

# Main funtion for checking to see if generated url is active 
def check(q, t_q): # args are queue and time_queue 
    while True: 
     try: 
      url = q.get(False) 
      # Make a timestamp 
      t = time.time() 
      try: 
       request = requests.head(url, timeout=5) 
       t = time.time() - t 
       t_q.put(t) 
       del request 
      except: 
       t = time.time() - t 
       t_q.put(t) 
     except: 
      break 

# Then start all the threads (20) 
thread_list = [] 
for thread in range(20): 
    task = Thread(target=check, args=(queue, time_queue)) 
    task.start() 
    thread_list.append(task) 

# Join all the threads so the main process don't quit 
for each in thread_list: 
    each.join() 
main_time_end = time.time() 

# Put the timerQueue into a list to get the average 
time_queue_list = [] 
while True: 
    try: 
     time_queue_list.append(time_queue.get(False)) 
    except: 
     break 

# Results of the time 
average_response = sum(time_queue_list)/float(len(time_queue_list)) 
total_time = main_time_end - main_time 
line = "Standard Threading: Average response time: %s sec. -- Total time: %s sec." % (average_response, total_time) 
print line 

# Do the same thing all over again but this time do each url at a time 
# A main timestamp 
main_time = time.time() 

# Generate 100 urls and test them 
timer_list = [] 
for each in range(100): 
    url = ('http://www.' + str(each) + '.com') 
    t = time.time() 
    try: 
     request = requests.head(url, timeout=5) 
     timer_list.append(time.time() - t) 
    except: 
     timer_list.append(time.time() - t) 
main_time_end = time.time() 

# Results of the time 
average_response = sum(timer_list)/float(len(timer_list)) 
total_time = main_time_end - main_time 
line = "Not using threads: Average response time: %s sec. -- Total time: %s sec." % (average_response, total_time) 
print line 

आप देख सकते हैं, यह बहुत अच्छी तरह से multithreading है (मैं मुख्य रूप से इस्तेमाल किया urllib और urllib2 था)। असल में, मेरे अधिकांश परीक्षणों से पता चलता है कि थ्रेडिंग मॉड्यूल मल्टीप्रोसेसिंग मॉड्यूल की तुलना में वास्तव में तेज़ है। (मुझे समझ में नहीं आता क्यों!) मेरे कुछ परिणाम यहां दिए गए हैं।

Multiprocessing: Average response time: 2.40511314869 sec. -- Total time: 25.6876308918 sec. 
Standard Threading: Average response time: 2.2179402256 sec. -- Total time: 24.2941861153 sec. 
Not using threads: Average response time: 2.1740363431 sec. -- Total time: 217.404567957 sec. 

यह मेरा घर नेटवर्क पर किया गया था, मेरे सर्वर पर प्रतिक्रिया समय बहुत तेजी से है। मुझे लगता है कि मेरे प्रश्न का परोक्ष रूप से उत्तर दिया गया है, क्योंकि मुझे एक और अधिक जटिल लिपि पर मेरी समस्याएं थीं। सभी सुझावों ने मुझे इसे बहुत अच्छी तरह अनुकूलित करने में मदद की। सभी को धन्यवाद!

+1

आप HTTP शेष काम करने के लिए एक अलग अजगर मॉड्यूल की कोशिश की है, हो सकता है [अनुरोध] (http: //docs.python

इस कोड मुड़ अतुल्यकालिक मैं/हे पुस्तकालय का उपयोग करता है की कोशिश करो -requests.org/en/latest/)? हम जानते हैं कि 'urllib' [थ्रेड-सुरक्षित नहीं है] (http://stackoverflow.com/a/5825531/228489), हालांकि मुझे नहीं लगता कि मल्टीप्रोसेस को प्रभावित करना चाहिए, लेकिन मैं खोजने के लिए एक अलग मॉड्यूल का प्रयास करूंगा बाहर। – amccormack

+1

आप कैसे बता सकते हैं कि केवल एक ही प्रक्रिया चल रही है? मुझे लगता है कि क्या होता है कि एक गणितीय फ़ंक्शन http अनुरोध से पूरा करने के लिए बहुत तेज़ है और यह लगता है कि रन की तरह सिंक्रोनस है, यह वास्तव में कई अनुरोध कर रहा है लेकिन मानक आउटपुट को स्पष्ट रूप से लिखने का प्रबंधन करता है क्योंकि वे धीमे होते हैं। –

+0

@ReutSharabani ठीक है, मैं इसे "htop" में देख रहा था लेकिन यह एक समय में केवल एक प्रिंट करता है। यदि यह वास्तव में कई प्रक्रियाओं को चला रहा था तो यह कई बार प्रिंट करेगा। – TysonU

उत्तर

1

यह समारोह पर 50 उदाहरणों शुरू होता है, लेकिन केवल एक समय

आप htop के परिणामों की गलत व्याख्या की है पर एक चलाता है। केवल कुछ, यदि कोई हों, तो किसी भी विशिष्ट उदाहरण पर अजगर की प्रतियां चलने योग्य होंगी। उनमें से अधिकतर नेटवर्क I/O के लिए प्रतीक्षा अवरुद्ध हो जाएंगे।

प्रक्रियाएं वास्तव में समानांतर चल रही हैं।

फ़ंक्शन को नेटवर्क अनुरोध के बजाय गणितीय फ़ंक्शन में बदलने का प्रयास करें और आप देखेंगे कि सभी पचास धागे एक साथ चलते हैं।

कार्य को गणितीय फ़ंक्शन में बदलना केवल सीपीयू-बाउंड (उदा। गणित) और आईओ-बाउंड (उदा। Urlopen) प्रक्रियाओं के बीच अंतर को दर्शाता है। पूर्व हमेशा चलने योग्य होता है, उत्तरार्द्ध शायद ही कभी चलने योग्य होता है।

यह केवल एक समय में प्रिंट करता है। यदि यह वास्तव में कई प्रक्रियाओं को चला रहा था तो यह कई बार प्रिंट करेगा।

यह एक समय में प्रिंट करता है क्योंकि आप टर्मिनल पर लाइन लिख रहे हैं। चूंकि रेखाएं अलग-अलग हैं, इसलिए आप यह बताने में सक्षम नहीं होंगे कि वे सभी एक धागे से लिखे गए हैं, या प्रत्येक बदले में एक अलग थ्रेड द्वारा लिखे गए हैं।

+0

तो मेरा सवाल यह है - क्या एक लिनक्स मशीन में एक से अधिक आईओ-बाध्य प्रक्रिया हो सकती है? मुझे पता है कि हर बंदरगाह एक प्रक्रिया तक ही सीमित है। क्या मेरा सिस्टम केवल 'urlopen' के लिए एक बंदरगाह खोल रहा है? क्या मैं किसी भी तरह से मैन्युअल रूप से प्रत्येक प्रक्रिया को नए बंदरगाह पर खोल नहीं सकता और शायद अधिक सफलता प्राप्त कर सकता हूं? – TysonU

+0

1) हां, ज़ाहिर है कि लिनक्स में कई आईओ-बाध्य प्रक्रियाएं हो सकती हैं। असल में, यह वही है जो आपको दिखा रहा है - आपकी 50 प्रक्रियाओं में से शुरू होता है, उनमें से अधिकतर आईओ की प्रतीक्षा कर रहे हैं। 2) "मुझे पता है कि हर बंदरगाह एक प्रक्रिया तक ही सीमित है।" बकवास। ऐसी कोई सीमा नहीं है। 3) "और अधिक सफलता है" - क्या, आप वास्तव में पूरा करने की कोशिश कर रहे हैं, और आपको क्या लगता है कि आप इसे पहले से पूरा नहीं कर रहे हैं? –

+0

यदि दो धागे एक ही समय में प्रिंट करने का प्रयास करते हैं तो आमतौर पर प्रति पंक्ति एकाधिक कथन प्रिंट करने में परिणाम होता है। – TysonU

0

सबसे पहले, नेटवर्क I/O समानांतर करने के लिए multiprocessing का उपयोग करना एक ओवरकिल है। अंतर्निहित threading या हल्के हेडलेट लाइब्रेरी जैसे gevent का उपयोग करना कम ओवरहेड के साथ एक बेहतर विकल्प है। जीआईएल के पास आईओ कॉल को अवरुद्ध करने के साथ कुछ लेना देना नहीं है, इसलिए आपको इसके बारे में चिंता करने की ज़रूरत नहीं है।

दूसरा, यह देखने का एक आसान तरीका है कि यदि आप स्टेडआउट की निगरानी कर रहे हैं तो समानांतर में आपके उपप्रोसेसेस/धागे/ग्रीनलेट चल रहे हैं, तो उपप्रोसेसेस/थ्रेड्स/ग्रीनलेट्स के ठीक बाद, फ़ंक्शन की शुरुआत में कुछ प्रिंट करना है । उदाहरण के लिए, तो

def check(q): 
    print 'Start checking urls!' 
    while True: 
     ... 

की तरह अपने check() समारोह को संशोधित यदि आपका कोड सही है, तो आप कई Start checking urls! लाइनों url + ' is [not] an active url!' के किसी भी मुद्रित से पहले बाहर मुद्रित देखना चाहिए। यह मेरी मशीन पर काम करता है, इसलिए ऐसा लगता है कि आपका कोड सही है।

+0

मुझे लगता है कि यहां सवाल नहीं है * "समानांतर में जांच करता है?" *। यह * * urllib.urlopen समानांतर में चलाता है? "*। –

+0

यदि 'चेक()' समानांतर में चलता है, तो 'urllib.urlopen()' समानांतर में चलाएगा (जब तक कि उसकी फ़ाइल डिस्क्रिप्टर सेटिंग्स में कोई गंभीरता से गलत न हो, जो मुझे संदेह है)। यदि आप सबूत चाहते हैं, तो अनुक्रमिक तरीके से 'चेक()' चलाएं (यानी, 'चेक में थ्रेड के लिए' ... 'चेक (कतार)' के साथ ब्लॉक करें और आप देखेंगे कि यूआरएल जांच बहुत अधिक लेती है समय। – oxymor0n

+0

मुझे लगता है कि यह ** समानांतर में चलता है, मैं बस इतना कह रहा हूं कि आपका उत्तर प्रश्न को अनदेखा करता है। उन्होंने स्पष्ट रूप से कहा कि गणितीय गणना उनके लिए समानांतर में चलती है। –

0

ऐसा प्रतीत होता है कि आपकी समस्या वास्तव में gethostbyname(3) के धारावाहिक व्यवहार के साथ है। इस पर this SO thread में चर्चा की गई है।

import random 
import sys 
from twisted.internet import reactor 
from twisted.internet import defer 
from twisted.internet.task import cooperate 
from twisted.web import client 

SIMULTANEOUS_CONNECTIONS = 25 
# Generate 10,000 random urls to test and put them in the queue 
pages = [] 
for each in range(10000): 
    rand_num = random.randint(1000,10000) 
    url = ('http://www.' + str(rand_num) + '.com') 
    pages.append(url) 

# Main function for checking to see if generated url is active 
def check(page): 
    def successback(data, page): 
     print "{} is an active URL!".format(page) 

    def errback(err, page): 
     print "{} is not an active URL!; errmsg:{}".format(page, err.value) 

    d = client.getPage(page, timeout=3) # timeout in seconds 
    d.addCallback(successback, page) 
    d.addErrback(errback, page) 
    return d 

def generate_checks(pages): 
    for i in xrange(0, len(pages)): 
     page = pages[i] 
     #print "Page no. {}".format(i) 
     yield check(page) 

def work(pages): 
    print "started work(): {}".format(len(pages)) 
    batch_size = len(pages)/SIMULTANEOUS_CONNECTIONS 
    for i in xrange(0, len(pages), batch_size): 
     task = cooperate(generate_checks(pages[i:i+batch_size])) 

print "starting..." 
reactor.callWhenRunning(work, pages) 
reactor.run() 
संबंधित मुद्दे

 संबंधित मुद्दे