मैं मार्गों को सबमिट करने और ड्राइव-टाइम वापस पाने के लिए HTTP के माध्यम से स्थानीय सर्वर (ओएसआरएम) से कनेक्ट कर रहा हूं। मैंने देखा है कि I/O थ्रेडिंग से धीमा है क्योंकि ऐसा लगता है कि गणना के लिए प्रतीक्षा अवधि जेएसओएन आउटपुट को अनुरोध करने और संसाधित करने के समय से छोटी है (मुझे लगता है कि जब सर्वर कुछ समय लेता है तो I/O बेहतर होता है अपने अनुरोध को संसाधित करें -> आप नहीं चाहते कि यह अवरुद्ध हो क्योंकि आपको प्रतीक्षा करनी है, यह मेरा मामला नहीं है)। थ्रेडिंग ग्लोबल इंटरप्रेटर लॉक से पीड़ित है और इसलिए यह दिखाई देता है (और नीचे सबूत) कि मेरा सबसे तेज़ विकल्प मल्टीप्रोसेसिंग का उपयोग करना है।पायथन अनुरोध - धागे/प्रक्रिया बनाम आईओ
मल्टीप्रोसेसिंग के साथ समस्या यह है कि यह इतना तेज़ है कि यह मेरे सॉकेट को समाप्त करता है और मुझे एक त्रुटि मिलती है (अनुरोध हर बार एक नया कनेक्शन जारी करता है)। मैं (सीरियल में) अनुरोधों का उपयोग कर सकता हूं। सत्र() ऑब्जेक्ट को जिंदा रखने के लिए ऑब्जेक्ट, हालांकि मैं इसे समानांतर में काम नहीं कर सकता (प्रत्येक प्रक्रिया में इसका अपना सत्र होता है)।
निकटतम कोड मैं इस समय काम करने के लिए है इस बहु कोड है:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
हालांकि, मैं नहीं मिल सकता है HTTPConnectionPool ठीक से काम करने और यह नया सॉकेट हर बार (मुझे लगता है कि) बनाता है और (जल्दी से जल्दी)
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
मेरा लक्ष्य एक OSRM-routing server से दूरी की गणना मैं स्थानीय रूप से चला रहा हूँ प्राप्त करने के लिए है: तो मुझे त्रुटि देता है।
मेरे पास दो भागों में एक प्रश्न है - मूल रूप से मैं मल्टीप्रोसेसिंग.पूल() को बेहतर कोड (उचित असिंक्रोनस फ़ंक्शंस) का उपयोग करके कुछ कोड बदलने की कोशिश कर रहा हूं - ताकि निष्पादन कभी नहीं टूटता और यह जितनी जल्दी हो सके चलता है)।
मेरे पास जो मुद्दा है, वह यह है कि मैं जो कुछ भी कोशिश करता हूं वह मल्टीप्रोसेसिंग से धीमा लगता है (मैंने जो कुछ भी किया है उसके नीचे कई उदाहरण प्रस्तुत करते हैं)।
कुछ संभावित तरीके हैं: gevents, grequests, बवंडर, अनुरोध-वायदा, asyncio, आदि
ए - Multiprocessing.Pool()
मैं शुरू में कुछ इस तरह के साथ शुरू किया:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
मैं कहाँ एक स्थानीय सर्वर से कनेक्ट किया गया था (स्थानीय होस्ट, पोर्ट: 5005) जो 8 धागे और supports parallel execution पर शुरू किया गया था।
थोड़ी सी खोज के बाद मुझे एहसास हुआ कि मुझे जो त्रुटि मिल रही थी क्योंकि अनुरोध opening a new connection/socket for each-request था। तो यह वास्तव में थोड़ी देर के बाद बहुत तेज़ और थकाऊ सॉकेट था। ऐसा लगता है कि इसका समाधान करने का तरीका अनुरोधों का उपयोग करना है। सत्र() - हालांकि मैं इसे मल्टीप्रोसेसिंग के साथ काम नहीं कर पाया (जहां प्रत्येक प्रक्रिया का अपना सत्र होता है)।
प्रश्न 1.
कंप्यूटर से कुछ पर इस चलाता है ठीक है, उदा:
बाद के खिलाफ तुलना करने के लिए: 45% सर्वर के उपयोग और प्रति सेकंड
हालांकि 1700 अनुरोध, कुछ पर ऐसा नहीं होता है और मैं पूरी तरह से समझ में नहीं आता क्यों:
HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
मेरा अनुमान यह होगा कि, जब अनुरोध उपयोग में होता है तो सॉकेट लॉक करता है - कभी-कभी सर्वर पुराने अनुरोध का जवाब देने में बहुत धीमा होता है और एक नया उत्पन्न होता है। सर्वर कतार में समर्थन करता है, हालांकि कतार में जोड़ने के बजाय अनुरोध त्रुटि नहीं है मुझे त्रुटि मिलती है?
प्रश्न 2.
मैंने पाया:
Blocking Or Non-Blocking?
With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.
If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.
Two excellent examples are grequests and requests-futures.
बी - अनुरोध-वायदा
अतुल्यकालिक अनुरोध तो मैं करने की कोशिश की उपयोग करने के लिए मेरे कोड को फिर से लिखने को यह मैं जरूरत से निपटने के लिए नीचे का उपयोग कर:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
और मुख्य कोड (वैसे मैं सभी धागे का उपयोग करने के विकल्प के साथ अपने सर्वर शुरू):
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
मेरी समारोह (ReqOsrm) जहां अब पुनः लिखा जाता है के रूप में:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
हालांकि, यह कोड मल्टीप्रोसेसिंग की तुलना में धीमी है! इससे पहले कि मैं लगभग 1700 अनुरोध प्राप्त कर रहा था, अब मुझे 600 सेकंड मिल रहा है। मुझे लगता है कि ऐसा इसलिए है क्योंकि मेरे पास पूर्ण CPU उपयोग नहीं है, हालांकि मुझे यकीन नहीं है कि इसे कैसे बढ़ाया जाए?
सी - थ्रेड
मैं किसी अन्य विधि (creating threads) की कोशिश की - लेकिन फिर से यकीन है कि कैसे इस CPU उपयोग को अधिकतम करने के लिए नहीं था (आदर्श मैं अपने सर्वर 50 का उपयोग करते हुए देखना चाहते हैं %, नहीं?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
इस विधि requests_futures मुझे लगता है कि तुलना में तेजी है, लेकिन मैं कितने सूत्र इस अधिकतम करने के लिए सेट करने के लिए पता नहीं है -
डी - (बवंडर काम नहीं कर रहा)
अब मैं टर्ननाडो की कोशिश कर रहा हूं - हालांकि मैं इसे कोड कोड -1073741819 के साथ तोड़ने में काफी काम नहीं कर सकता अगर मैं कर्ल का उपयोग करता हूं - अगर मैं सरल_httpclient का उपयोग करता हूं तो यह काम करता है लेकिन फिर मुझे टाइमआउट त्रुटियां मिलती हैं:
ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
ई - asyncio/aiohttp
एक और दृष्टिकोण की कोशिश करने के (बहुत अच्छा होगा, हालांकि काम कर बवंडर प्राप्त करने के लिए) asyncio और aiohttp का उपयोग कर निर्णय लिया।
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
यह ठीक काम करता है, हालांकि अभी भी मल्टीप्रोसेसिंग से धीमा है!
एक और दृष्टिकोण अन्य है एक का उपयोग करने के: इस धागे और शायद सबसे तेजी से उपयोग करने की तुलना में तेजी से जब बहु के साथ संयोजन में उपयोग किया हो सकता है (वर्तमान में यह केवल 1 कोर का उपयोग करें) घटना लूप आप कॉलबैक के साथ अनुरोध पंजीकृत कर सकते हैं और प्रतिक्रिया लौटने पर ईवेंट लूप को संभालने का इंतजार कर सकते हैं – dm03514
@ dm03514 इसके लिए धन्यवाद! हालांकि, क्या यह मेरे पास नहीं है जब मैं अपने अनुरोध करता हूं-वायदा उदाहरण? 'भविष्य = session.get (url_in, background_callback = lambda sess, resp: ReqOsrm (sess, resp), – mptevsion
मैंने कभी भी अनुरोध फ़्यूचर का उपयोग नहीं किया है, लेकिन मुझे लगता है कि यह अभी भी थ्रेड पूल से संबंधित है, इवेंट लूप एक नया होना चाहिए अनुरोध मॉडल सभी एक साथ, और केवल एक धागा का पर्दाफाश करेगा, इसलिए आपको चिंता करने की ज़रूरत नहीं है कि काम करने के लिए कितने धागे कॉन्फ़िगर करना है :) python में stdlibrary में एक है https://pypi.python.org/pypi/aiohttp , जिसे मैंने कभी भी उपयोग नहीं किया है, लेकिन अपेक्षाकृत सीधा दिखता है, टॉरनाडो ओएस इवेंट लाइब्रेरीज़ पर बनाया गया एक ढांचा है जिसमें सरल एपीआई है। http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514