12

मैं मार्गों को सबमिट करने और ड्राइव-टाइम वापस पाने के लिए HTTP के माध्यम से स्थानीय सर्वर (ओएसआरएम) से कनेक्ट कर रहा हूं। मैंने देखा है कि I/O थ्रेडिंग से धीमा है क्योंकि ऐसा लगता है कि गणना के लिए प्रतीक्षा अवधि जेएसओएन आउटपुट को अनुरोध करने और संसाधित करने के समय से छोटी है (मुझे लगता है कि जब सर्वर कुछ समय लेता है तो I/O बेहतर होता है अपने अनुरोध को संसाधित करें -> आप नहीं चाहते कि यह अवरुद्ध हो क्योंकि आपको प्रतीक्षा करनी है, यह मेरा मामला नहीं है)। थ्रेडिंग ग्लोबल इंटरप्रेटर लॉक से पीड़ित है और इसलिए यह दिखाई देता है (और नीचे सबूत) कि मेरा सबसे तेज़ विकल्प मल्टीप्रोसेसिंग का उपयोग करना है।पायथन अनुरोध - धागे/प्रक्रिया बनाम आईओ

मल्टीप्रोसेसिंग के साथ समस्या यह है कि यह इतना तेज़ है कि यह मेरे सॉकेट को समाप्त करता है और मुझे एक त्रुटि मिलती है (अनुरोध हर बार एक नया कनेक्शन जारी करता है)। मैं (सीरियल में) अनुरोधों का उपयोग कर सकता हूं। सत्र() ऑब्जेक्ट को जिंदा रखने के लिए ऑब्जेक्ट, हालांकि मैं इसे समानांतर में काम नहीं कर सकता (प्रत्येक प्रक्रिया में इसका अपना सत्र होता है)।

निकटतम कोड मैं इस समय काम करने के लिए है इस बहु कोड है:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count()) 

def ReqOsrm(url_input): 
    ul, qid = url_input  
    try: 
     response = conn_pool.request('GET', ul) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     status = int(json_geocode['status']) 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from, used_to = json_geocode['via_points'] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
      return out 
     else: 
      print("Done but no route: %d %s" % (qid, req_url)) 
      return [qid, 999, 0, 0, 0, 0, 0, 0] 
    except Exception as err: 
     print("%s: %d %s" % (err, qid, req_url)) 
     return [qid, 999, 0, 0, 0, 0, 0, 0] 

# run: 
pool = Pool(cpu_count()) 
calc_routes = pool.map(ReqOsrm, url_routes) 
pool.close() 
pool.join() 

हालांकि, मैं नहीं मिल सकता है HTTPConnectionPool ठीक से काम करने और यह नया सॉकेट हर बार (मुझे लगता है कि) बनाता है और (जल्दी से जल्दी)

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


मेरा लक्ष्य एक OSRM-routing server से दूरी की गणना मैं स्थानीय रूप से चला रहा हूँ प्राप्त करने के लिए है: तो मुझे त्रुटि देता है।

मेरे पास दो भागों में एक प्रश्न है - मूल रूप से मैं मल्टीप्रोसेसिंग.पूल() को बेहतर कोड (उचित असिंक्रोनस फ़ंक्शंस) का उपयोग करके कुछ कोड बदलने की कोशिश कर रहा हूं - ताकि निष्पादन कभी नहीं टूटता और यह जितनी जल्दी हो सके चलता है)।

मेरे पास जो मुद्दा है, वह यह है कि मैं जो कुछ भी कोशिश करता हूं वह मल्टीप्रोसेसिंग से धीमा लगता है (मैंने जो कुछ भी किया है उसके नीचे कई उदाहरण प्रस्तुत करते हैं)।

कुछ संभावित तरीके हैं: gevents, grequests, बवंडर, अनुरोध-वायदा, asyncio, आदि

ए - Multiprocessing.Pool()

मैं शुरू में कुछ इस तरह के साथ शुरू किया:

def ReqOsrm(url_input): 
    req_url, query_id = url_input 
    try_c = 0 
    #print(req_url) 
    while try_c < 5: 
     try: 
      response = requests.get(req_url) 
      json_geocode = response.json() 
      status = int(json_geocode['status']) 
      # Found route between points 
      if status == 200: 
      .... 

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes) 

मैं कहाँ एक स्थानीय सर्वर से कनेक्ट किया गया था (स्थानीय होस्ट, पोर्ट: 5005) जो 8 धागे और supports parallel execution पर शुरू किया गया था।

थोड़ी सी खोज के बाद मुझे एहसास हुआ कि मुझे जो त्रुटि मिल रही थी क्योंकि अनुरोध opening a new connection/socket for each-request था। तो यह वास्तव में थोड़ी देर के बाद बहुत तेज़ और थकाऊ सॉकेट था। ऐसा लगता है कि इसका समाधान करने का तरीका अनुरोधों का उपयोग करना है। सत्र() - हालांकि मैं इसे मल्टीप्रोसेसिंग के साथ काम नहीं कर पाया (जहां प्रत्येक प्रक्रिया का अपना सत्र होता है)।

प्रश्न 1.

कंप्यूटर से कुछ पर इस चलाता है ठीक है, उदा:

enter image description here

बाद के खिलाफ तुलना करने के लिए: 45% सर्वर के उपयोग और प्रति सेकंड

हालांकि 1700 अनुरोध, कुछ पर ऐसा नहीं होता है और मैं पूरी तरह से समझ में नहीं आता क्यों:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))

मेरा अनुमान यह होगा कि, जब अनुरोध उपयोग में होता है तो सॉकेट लॉक करता है - कभी-कभी सर्वर पुराने अनुरोध का जवाब देने में बहुत धीमा होता है और एक नया उत्पन्न होता है। सर्वर कतार में समर्थन करता है, हालांकि कतार में जोड़ने के बजाय अनुरोध त्रुटि नहीं है मुझे त्रुटि मिलती है?

प्रश्न 2.

मैंने पाया:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.

बी - अनुरोध-वायदा

अतुल्यकालिक अनुरोध तो मैं करने की कोशिश की उपयोग करने के लिए मेरे कोड को फिर से लिखने को यह मैं जरूरत से निपटने के लिए नीचे का उपयोग कर:

from requests_futures.sessions import FuturesSession 
from concurrent.futures import ThreadPoolExecutor, as_completed 

और मुख्य कोड (वैसे मैं सभी धागे का उपयोग करने के विकल्प के साथ अपने सर्वर शुरू):

calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session: 
    # Submit requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 
    # Process the futures as they become complete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 
     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0, 0, 0, 0, 0] 
     calc_routes.append(row) 

मेरी समारोह (ReqOsrm) जहां अब पुनः लिखा जाता है के रूप में:

def ReqOsrm(sess, resp): 
    json_geocode = resp.json() 
    status = int(json_geocode['status']) 
    # Found route between points 
    if status == 200: 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
    # Cannot find route between points (code errors as 999) 
    else: 
     out = [999, 0, 0, 0, 0, 0, 0] 
    resp.data = out 

हालांकि, यह कोड मल्टीप्रोसेसिंग की तुलना में धीमी है! इससे पहले कि मैं लगभग 1700 अनुरोध प्राप्त कर रहा था, अब मुझे 600 सेकंड मिल रहा है। मुझे लगता है कि ऐसा इसलिए है क्योंकि मेरे पास पूर्ण CPU उपयोग नहीं है, हालांकि मुझे यकीन नहीं है कि इसे कैसे बढ़ाया जाए?

enter image description here

सी - थ्रेड

मैं किसी अन्य विधि (creating threads) की कोशिश की - लेकिन फिर से यकीन है कि कैसे इस CPU उपयोग को अधिकतम करने के लिए नहीं था (आदर्श मैं अपने सर्वर 50 का उपयोग करते हुए देखना चाहते हैं %, नहीं?):

def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 

def getReq(url): 
    try: 
     resp = requests.get(url) 
     return resp.status_code, resp 
    except: 
     return 999, None 

def processReq(status, resp, qid): 
    try: 
     json_geocode = resp.json() 
     # Found route between points 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from = json_geocode['via_points'][0] 
      used_to = json_geocode['via_points'][1] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
     else: 
      print("Done but no route") 
      out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    except Exception as err: 
     print("Error: %s" % err) 
     out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    qres.put(out) 
    return 

#Run: 
concurrent = 1000 
qres = Queue() 
q = Queue(concurrent) 

for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
try: 
    for url in url_routes: 
     q.put(url) 
     q.join() 
    except Exception: 
     pass 

# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

इस विधि requests_futures मुझे लगता है कि तुलना में तेजी है, लेकिन मैं कितने सूत्र इस अधिकतम करने के लिए सेट करने के लिए पता नहीं है -

enter image description here

डी - (बवंडर काम नहीं कर रहा)

अब मैं टर्ननाडो की कोशिश कर रहा हूं - हालांकि मैं इसे कोड कोड -1073741819 के साथ तोड़ने में काफी काम नहीं कर सकता अगर मैं कर्ल का उपयोग करता हूं - अगर मैं सरल_httpclient का उपयोग करता हूं तो यह काम करता है लेकिन फिर मुझे टाइमआउट त्रुटियां मिलती हैं:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r): 
    try: 
     json_geocode = json_decode(r) 
     status = int(json_geocode['status']) 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
     print(out) 
    except Exception as err: 
     print(err) 
     out = [999, 0, 0, 0, 0, 0, 0] 
    return out 

# Configure 
# For some reason curl_httpclient crashes my computer 
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10) 

@gen.coroutine 
def run_experiment(urls): 
    http_client = AsyncHTTPClient() 
    responses = yield [http_client.fetch(url) for url, qid in urls] 
    responses_out = [handle_req(r.body) for r in responses] 
    raise gen.Return(value=responses_out) 

# Initialise 
_ioloop = ioloop.IOLoop.instance() 
run_func = partial(run_experiment, url_routes) 
calc_routes = _ioloop.run_sync(run_func) 

ई - asyncio/aiohttp

एक और दृष्टिकोण की कोशिश करने के (बहुत अच्छा होगा, हालांकि काम कर बवंडर प्राप्त करने के लिए) asyncio और aiohttp का उपयोग कर निर्णय लिया।

import asyncio 
import aiohttp 

def handle_req(data, qid): 
    json_geocode = json.loads(data.decode('utf-8')) 
    status = int(json_geocode['status']) 
    if status == 200: 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
    else: 
     print("Done, but not route for {0} - status: {1}".format(qid, status)) 
     out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    return out 

def chunked_http_client(num_chunks): 
    # Use semaphore to limit number of requests 
    semaphore = asyncio.Semaphore(num_chunks) 
    @asyncio.coroutine 
    # Return co-routine that will download files asynchronously and respect 
    # locking fo semaphore 
    def http_get(url, qid): 
     nonlocal semaphore 
     with (yield from semaphore): 
      response = yield from aiohttp.request('GET', url) 
      body = yield from response.content.read() 
      yield from response.wait_for_close() 
     return body, qid 
    return http_get 

def run_experiment(urls): 
    http_client = chunked_http_client(500) 
    # http_client returns futures 
    # save all the futures to a list 
    tasks = [http_client(url, qid) for url, qid in urls] 
    response = [] 
    # wait for futures to be ready then iterate over them 
    for future in asyncio.as_completed(tasks): 
     data, qid = yield from future 
     try: 
      out = handle_req(data, qid) 
     except Exception as err: 
      print("Error for {0} - {1}".format(qid,err)) 
      out = [qid, 999, 0, 0, 0, 0, 0, 0] 
     response.append(out) 
    return response 

# Run: 
loop = asyncio.get_event_loop() 
calc_routes = loop.run_until_complete(run_experiment(url_routes)) 

यह ठीक काम करता है, हालांकि अभी भी मल्टीप्रोसेसिंग से धीमा है!

enter image description here

+1

एक और दृष्टिकोण अन्य है एक का उपयोग करने के: इस धागे और शायद सबसे तेजी से उपयोग करने की तुलना में तेजी से जब बहु के साथ संयोजन में उपयोग किया हो सकता है (वर्तमान में यह केवल 1 कोर का उपयोग करें) घटना लूप आप कॉलबैक के साथ अनुरोध पंजीकृत कर सकते हैं और प्रतिक्रिया लौटने पर ईवेंट लूप को संभालने का इंतजार कर सकते हैं – dm03514

+0

@ dm03514 इसके लिए धन्यवाद! हालांकि, क्या यह मेरे पास नहीं है जब मैं अपने अनुरोध करता हूं-वायदा उदाहरण? 'भविष्य = session.get (url_in, background_callback = lambda sess, resp: ReqOsrm (sess, resp), – mptevsion

+1

मैंने कभी भी अनुरोध फ़्यूचर का उपयोग नहीं किया है, लेकिन मुझे लगता है कि यह अभी भी थ्रेड पूल से संबंधित है, इवेंट लूप एक नया होना चाहिए अनुरोध मॉडल सभी एक साथ, और केवल एक धागा का पर्दाफाश करेगा, इसलिए आपको चिंता करने की ज़रूरत नहीं है कि काम करने के लिए कितने धागे कॉन्फ़िगर करना है :) python में stdlibrary में एक है https://pypi.python.org/pypi/aiohttp , जिसे मैंने कभी भी उपयोग नहीं किया है, लेकिन अपेक्षाकृत सीधा दिखता है, टॉरनाडो ओएस इवेंट लाइब्रेरीज़ पर बनाया गया एक ढांचा है जिसमें सरल एपीआई है। http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514

उत्तर

1

प्रश्न के शीर्ष पर अपने मल्टीप्रोसेसिंग कोड को देखते हुए। ऐसा लगता है कि प्रत्येक बार ReqOsrm कहा जाता है कि HttpConnectionPool() कहा जा रहा है। इस प्रकार प्रत्येक यूआरएल के लिए एक नया पूल बनाया जाता है।इसके बजाय, प्रत्येक प्रक्रिया के लिए एकल पूल बनाने के लिए initializer और args पैरामीटर का उपयोग करें।

conn_pool = None 

def makePool(host, port): 
    global conn_pool 
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1) 

def ReqOsrm(url_input): 
    ul, qid = url_input 

    try: 
     response = conn_pool.request('GET', ul) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     status = int(json_geocode['status']) 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from, used_to = json_geocode['via_points'] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
      return out 

     else: 
      print("Done but no route: %d %s" % (qid, req_url)) 
      return [qid, 999, 0, 0, 0, 0, 0, 0] 

    except Exception as err: 
     print("%s: %d %s" % (err, qid, req_url)) 
     return [qid, 999, 0, 0, 0, 0, 0, 0] 

if __name__ == "__main__": 
    # run: 
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005)) 
    calc_routes = pool.map(ReqOsrm, url_routes) 
    pool.close() 
    pool.join() 

अनुरोध-वायदा संस्करण में इंडेंटेशन त्रुटि दिखाई देती है। लूप for future in as_completed(futures): बाहरी लूप for i in range(len(url_routes)): के तहत इंडेंट किया गया है। तो बाहरी लूप में एक अनुरोध किया जाता है और फिर आंतरिक लूप उस भविष्य के लिए बाहरी लूप के अगले पुनरावृत्ति से पहले लौटने की प्रतीक्षा करता है। यह अनुरोध समानांतर के बजाय क्रमशः चलाता है।

मुझे लगता है कि इस प्रकार कोड होना चाहिए:

calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session: 
    # Submit all the requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 

    # this was indented under the code in section B of the question 
    # process the futures as they become copmlete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 

     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0, 0, 0, 0, 0] 
     print(row) 
     calc_routes.append(row) 
2

प्रश्न 1

आप त्रुटि मिलती है, क्योंकि इस दृष्टिकोण:

def ReqOsrm(url_input): 
    req_url, query_id = url_input 
    try_c = 0 
    #print(req_url) 
    while try_c < 5: 
     try: 
      response = requests.get(req_url) 
      json_geocode = response.json() 
      status = int(json_geocode['status']) 
      # Found route between points 
      if status == 200: 
      .... 

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes) 

प्रत्येक अनुरोध किया गया URL के लिए एक नया TCP कनेक्शन बनाता है और कुछ बिंदु पर यह सिर्फ कारण विफल रहता है प्रणाली मुक्त स्थानीय बंदरगाहों से बाहर है। पुष्टि करने के लिए कि आप netstat चला सकते हैं, जबकि अपने कोड को क्रियान्वित किया जाता है:

netstat -a -n | find /c "localhost:5005" 

यह आपको सर्वर से कनेक्शन के एक नंबर दे देंगे।

इसके अलावा, 1700 आरपीएस तक पहुंचने से इस दृष्टिकोण के लिए काफी अवास्तविक दिखता है, क्योंकि requests.get काफी महंगा संचालन है और यह संभावना नहीं है कि आप इस तरह से 50 आरपीएस भी प्राप्त कर सकें। इसलिए, आपको शायद अपनी आरपीएस गणनाओं को दोबारा जांचना होगा।

त्रुटि आप स्क्रैच से कनेक्शन बनाने के बजाय सत्र उपयोग करने की आवश्यकता से बचने के लिए:

import multiprocessing 
import requests 
import time 


class Worker(multiprocessing.Process): 
    def __init__(self, qin, qout, *args, **kwargs): 
     super(Worker, self).__init__(*args, **kwargs) 
     self.qin = qin 
     self.qout = qout 

    def run(self): 
     s = requests.session() 
     while not self.qin.empty(): 
      result = s.get(self.qin.get()) 
      self.qout.put(result) 
      self.qin.task_done() 

if __name__ == '__main__': 
    start = time.time() 

    qin = multiprocessing.JoinableQueue() 
    [qin.put('http://localhost:8080/') for _ in range(10000)] 

    qout = multiprocessing.Queue() 

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())] 

    qin.join() 

    result = [] 
    while not qout.empty(): 
     result.append(qout.get()) 

    print time.time() - start 
    print result 

प्रश्न 2

आप मिल धागे या async के साथ उच्च आर पी एस जब तक कि मैं दृष्टिकोण नहीं होगा/ओ गणनाओं से अधिक समय लेता है (उदाहरण के लिए उच्च नेटवर्क विलंबता, बड़े प्रतिक्रियाएं, आदि), क्योंकि एक ही पायथन प्रक्रिया में चलने के बाद से जीआईएल द्वारा धागे प्रभावित होते हैं और एसिंक्रोनस libs को लंबी चलती गणनाओं से अवरुद्ध किया जा सकता है।

हालांकि धागे या एसिंक libs प्रदर्शन में सुधार कर सकते हैं, कई प्रक्रियाओं में एक ही थ्रेडेड या असिंक्रोनस कोड चलाने से आपको और भी अधिक प्रदर्शन मिल जाएगा।

5

धन्यवाद मदद के लिए हर किसी को। मैं अपने निष्कर्ष पोस्ट करना चाहता था:

चूंकि मेरे HTTP अनुरोध स्थानीय लोगों के लिए हैं जो तत्काल अनुरोध को संसाधित करते हैं, यह मेरे लिए एसिंक दृष्टिकोण का उपयोग करने के लिए अधिक समझ नहीं लेता है (ज्यादातर मामलों की तुलना में जब इंटरनेट पर अनुरोध भेजे जाते हैं)। मेरे लिए महंगा कारक वास्तव में अनुरोध भेज रहा है और प्रतिक्रिया को संसाधित कर रहा है, जिसका अर्थ है कि मुझे कई प्रक्रियाओं (जीआईएल से पीड़ित) का उपयोग करके बहुत बेहतर गति मिलती है। मुझे गति बढ़ाने के लिए सत्रों का भी उपयोग करना चाहिए (समान सर्वर से कनेक्शन को बंद करने और फिर से खोलने की आवश्यकता नहीं है) और पोर्ट-थकावट को रोकने में मदद करें।

यहाँ सभी तरीकों (कार्य) की कोशिश की उदाहरण आर पी एस के साथ कर रहे हैं:

सीरियल

एस 1। सीरियल जीईटी अनुरोध (कोई सत्र नहीं) -> 215 आरपीएस

def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = requests.get(url) 
     json_geocode = json.loads(response.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

एस 2। सीरियल जीईटी अनुरोध (अनुरोध। सत्र()) -> 335 आरपीएस

session = requests.Session() 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = session.get(url) 
     json_geocode = json.loads(response.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

एस 3। सीरियल GET अनुरोध (urllib3.HTTPConnectionPool) -> 545 आर पी एस

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1) 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = conn_pool.request('GET', url) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

Async आईओ

ए 4। Aynhttp के साथ AsyncIO -> 450 आरपीएस

import asyncio 
import aiohttp 
concurrent = 100 
def handle_req(data, qid): 
    json_geocode = json.loads(data.decode('utf-8')) 
    tot_time_s = json_geocode['paths'][0]['time'] 
    tot_dist_m = json_geocode['paths'][0]['distance'] 
    return [qid, 200, tot_time_s, tot_dist_m] 
def chunked_http_client(num_chunks): 
    # Use semaphore to limit number of requests 
    semaphore = asyncio.Semaphore(num_chunks) 
    @asyncio.coroutine 
    # Return co-routine that will download files asynchronously and respect 
    # locking fo semaphore 
    def http_get(url, qid): 
     nonlocal semaphore 
     with (yield from semaphore): 
      with aiohttp.ClientSession() as session: 
       response = yield from session.get(url) 
       body = yield from response.content.read() 
       yield from response.wait_for_close() 
     return body, qid 
    return http_get 
def run_experiment(urls): 
    http_client = chunked_http_client(num_chunks=concurrent) 
    # http_client returns futures, save all the futures to a list 
    tasks = [http_client(url, qid) for url, qid in urls] 
    response = [] 
    # wait for futures to be ready then iterate over them 
    for future in asyncio.as_completed(tasks): 
     data, qid = yield from future 
     try: 
      out = handle_req(data, qid) 
     except Exception as err: 
      print("Error for {0} - {1}".format(qid,err)) 
      out = [qid, 999, 0, 0] 
     response.append(out) 
    return response 
# Run: 
loop = asyncio.get_event_loop() 
calc_routes = loop.run_until_complete(run_experiment(url_routes)) 

ए 5। सत्रों के बिना थ्रेडिंग -> 330 आरपीएस

from threading import Thread 
from queue import Queue 
concurrent = 100 
def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 
def getReq(url): 
    try: 
     resp = requests.get(url) 
     return resp.status_code, resp 
    except: 
     return 999, None 
def processReq(status, resp, qid): 
    try: 
     json_geocode = json.loads(resp.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     out = [qid, 999, 0, 0] 
    qres.put(out) 
    return 
#Run: 
qres = Queue() 
q = Queue(concurrent) 
for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
for url in url_routes: 
    q.put(url) 
q.join() 
# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

ए 6। HTTPConnectionPool के साथ थ्रेडिंग -> 1550 आरपीएस

from threading import Thread 
from queue import Queue 
from urllib3 import HTTPConnectionPool 
concurrent = 100 
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent) 
def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 
def getReq(url): 
    try: 
     resp = conn_pool.request('GET', url) 
     return resp.status, resp 
    except: 
     return 999, None 
def processReq(status, resp, qid): 
    try: 
     json_geocode = json.loads(resp.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     out = [qid, 999, 0, 0] 
    qres.put(out) 
    return 
#Run: 
qres = Queue() 
q = Queue(concurrent) 
for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
for url in url_routes: 
    q.put(url) 
q.join() 
# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

ए 7। अनुरोध-वायदा -> 520 आर पी एस

from requests_futures.sessions import FuturesSession 
from concurrent.futures import ThreadPoolExecutor, as_completed 
concurrent = 100 
def ReqOsrm(sess, resp): 
    try: 
     json_geocode = resp.json() 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err) 
     out = [999, 0, 0] 
    resp.data = out 
#Run: 
calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session: 
    # Submit requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 
    # Process the futures as they become complete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 
     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0] 
     calc_routes.append(row) 

कई प्रक्रियाओं

P8। multiprocessing.worker + queue + request.session() -> 1058 आरपीएस

from multiprocessing import * 
class Worker(Process): 
    def __init__(self, qin, qout, *args, **kwargs): 
     super(Worker, self).__init__(*args, **kwargs) 
     self.qin = qin 
     self.qout = qout 
    def run(self): 
     s = requests.session() 
     while not self.qin.empty(): 
      url, qid = self.qin.get() 
      data = s.get(url) 
      self.qout.put(ReqOsrm(data, qid)) 
      self.qin.task_done() 
def ReqOsrm(resp, qid): 
    try: 
     json_geocode = json.loads(resp.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid) 
     return [qid, 999, 0, 0] 
# Run: 
qout = Queue() 
qin = JoinableQueue() 
[qin.put(url_q) for url_q in url_routes] 
[Worker(qin, qout).start() for _ in range(cpu_count())] 
qin.join() 
calc_routes = [] 
while not qout.empty(): 
    calc_routes.append(qout.get()) 

पी 9। बहु।कार्यकर्ता + कतार + HTTPConnectionPool() -> 1230 आरपीएस

पी 10। वी 2 बहु प्रक्रमण (वास्तव में यकीन नहीं है कि यह कैसे अलग है) -> 1350 आर पी एस

conn_pool = None 
def makePool(host, port): 
    global conn_pool 
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1) 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = conn_pool.request('GET', url) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     return [qid, 999, 0, 0] 
# Run: 
pool = Pool(initializer=makePool, initargs=(ghost, gport)) 
calc_routes = pool.map(ReqOsrm, url_routes) 

तो निष्कर्ष में ऐसा लगता है कि मेरे लिए सबसे अच्छा तरीकों # 10 (और आश्चर्यजनक रूप से # 6)

+1

को बाधित कर रहा हूं, तो एक अन्य दृष्टिकोण जिसे आप कोशिश कर सकते हैं, एसिन्सीओ (या गीवेंट) के साथ मल्टीप्रोसेसिंग का उपयोग करना है। मैंने केवल gevent का उपयोग किया है, लेकिन यह सिंगल थ्रेडेड कोरआउट के कारण केवल एक कोर का लाभ उठा सकता है। कोरआउट स्विच थ्रेड से तेज़ होना चाहिए ताकि मल्टीप्रोसेसिंग + कोरआउट सबसे तेज़ हो। –

+0

क्या आप एक उत्तर चुनने जा रहे हैं? – RootTwo

+0

मुझे त्रुटि मिल रही है: ChunkedEncodingError (ProtocolError ('कनेक्शन टूटा हुआ: अपूर्णता (162 बाइट्स पढ़ा गया)', अपूर्णता (162 बाइट्स पढ़ा जाता है)) P8 चलाते समय – Phillip

1

यहाँ है एक पैटर्न जिसे मैंने गीवेंट के साथ उपयोग किया है, जो कोरआउटिन आधारित है और जीआईएल से पीड़ित नहीं हो सकता है। इष्टतम थ्रेड पूल के आकार के साथ चारों ओर मूर्ख की कोशिश कर रहा से

from gevent import monkey 
monkey.patch_all() 

import logging 
import random 
import time 
from threading import Thread 

from gevent.queue import JoinableQueue 
from logger import initialize_logger 

initialize_logger() 
log = logging.getLogger(__name__) 


class Worker(Thread): 

    def __init__(self, worker_idx, queue): 
     # initialize the base class 
     super(Worker, self).__init__() 
     self.worker_idx = worker_idx 
     self.queue = queue 

    def log(self, msg): 
     log.info("WORKER %s - %s" % (self.worker_idx, msg)) 

    def do_work(self, line): 
     #self.log(line) 
     time.sleep(random.random()/10) 

    def run(self): 
     while True: 
      line = self.queue.get() 
      self.do_work(line) 
      self.queue.task_done() 


def main(number_of_workers=20): 
    start_time = time.time() 

    queue = JoinableQueue() 
    for idx in range(number_of_workers): 
     worker = Worker(idx, queue) 
     # "daemonize" a thread to ensure that the threads will 
     # close when the main program finishes 
     worker.daemon = True 
     worker.start() 

    for idx in xrange(100): 
     queue.put("%s" % idx) 

    queue.join() 
    time_taken = time.time() - start_time 
    log.info("Parallel work took %s seconds." % time_taken) 

    start_time = time.time() 
    for idx in xrange(100): 
     #log.info(idx) 
     time.sleep(random.random()/10) 
    time_taken = time.time() - start_time 
    log.info("Sync work took %s seconds." % time_taken) 


if __name__ == "__main__": 
    main() 
संबंधित मुद्दे