2017-08-30 11 views
11

Im कुछ सिंक्रोनस कोड को Aiohttp का उपयोग करके asyncio में स्थानांतरित करने की प्रक्रिया में। सिंक्रोनस कोड को चलाने में 15 मिनट लग रहे थे, इसलिए मैं इसे सुधारने की उम्मीद कर रहा हूं।पायथन Aiohttp/asyncio - लौटाए गए डेटा को कैसे संसाधित करें

मेरे पास कुछ कामकाजी कोड है जो कुछ यूआरएल से डेटा प्राप्त करता है और प्रत्येक के शरीर को लौटाता है। लेकिन यह सिर्फ 1 प्रयोगशाला साइट के खिलाफ है, मेरे पास 70+ वास्तविक साइटें हैं।

तो अगर मैं सभी साइटों है कि एक सूची में 700 यूआरएल होगा संसाधित करने के लिए के लिए सभी यूआरएल की एक सूची बनाने के लिए एक पाश मिला है। अब उन्हें संसाधित करना मुझे नहीं लगता कि एक समस्या है?

लेकिन परिणामों के साथ 'सामान' कर रहे हैं, मुझे यकीन नहीं है कि प्रोग्राम कैसे करें? मेरे पास पहले से ही कोड है जो लौटाए गए प्रत्येक परिणामों में 'सामान' करेगा, लेकिन मुझे यकीन नहीं है कि सही प्रकार के परिणाम के खिलाफ प्रोग्राम कैसे करें।

जब कोड चलता है तो यह सभी यूआरएल को संसाधित करता है और चलाने के समय के आधार पर, एक अज्ञात आदेश देता है?

क्या मुझे ऐसे फ़ंक्शन की आवश्यकता है जो किसी भी प्रकार के परिणाम को संसाधित करेगी?

import asyncio, aiohttp, ssl 
from bs4 import BeautifulSoup 

def page_content(page): 
    return BeautifulSoup(page, 'html.parser') 


async def fetch(session, url): 
    with aiohttp.Timeout(15, loop=session.loop): 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    tasks = [] 
    # Fetch all responses within one Client session, 
    # keep connection alive for all requests. 
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session: 
     for i in urls: 
      task = asyncio.ensure_future(fetch(session, i)) 
      tasks.append(task) 

     responses = await asyncio.gather(*tasks) 
     # you now have all response bodies in this variable 
     for i in responses: 
      print(i.title.text) 
     return responses 


def main(): 
    username = 'monitoring' 
    password = '*********' 
    ip = '10.10.10.2' 
    urls = [ 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'), 
     ] 
    loop = asyncio.get_event_loop() 
    future = asyncio.ensure_future(get_url_data(urls,username,password)) 
    data = loop.run_until_complete(future) 
    print(data) 

if __name__ == "__main__": 
    main() 

उत्तर

2

आपका कोड चिह्न से दूर नहीं है। asyncio.gather तर्कों के क्रम में परिणाम देता है, इसलिए आदेश यहां संरक्षित है, लेकिन page_content क्रम में नहीं बुलाया जाएगा।

कुछ बदलाव:

सबसे पहले, आप ensure_future यहां जरूरत नहीं है। एक कार्य बनाना केवल तभी जरूरी है जब आप अपने माता-पिता को कोरआउटिन से बाहर निकालने का प्रयास कर रहे हों, यानी यदि कार्य को जारी रखना जारी रहता है, भले ही यह कार्य किया गया हो। यहाँ आप क्या जरूरत के बजाय सीधे asyncio.gather बुला रहा है अपने coroutines साथ:

async def get_url_data(urls, username, password): 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(fetch(session, i) for i in urls)) 
    for i in responses: 
     print(i.title.text) 
    return responses 

लेकिन बुला यह सब एक ही समय में लाने का समय निर्धारित होता है, और यूआरएल की अधिक संख्या वाले, इस इष्टतम से दूर है। इसके बजाय आपको अधिकतम समेकन चुनना चाहिए और सुनिश्चित करें कि अधिकांश एक्स फ़ेच किसी भी समय चल रहे हैं। इसे कार्यान्वित करने के लिए, आप asyncio.Semaphore(20) का उपयोग कर सकते हैं, यह सेमफोर केवल 20 कोरआउट पर ही अधिग्रहित किया जा सकता है, इसलिए अन्य स्पॉट उपलब्ध होने तक हासिल करने की प्रतीक्षा करेंगे।

CONCURRENCY = 20 
TIMEOUT = 15 

async def fetch(session, sem, url): 
    async with sem: 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    sem = asyncio.Semaphore(CONCURRENCY) 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(
      asyncio.wait_for(fetch(session, sem, i), TIMEOUT) 
      for i in urls 
     )) 
    for i in responses: 
     print(i.title.text) 
    return responses 

इस तरह, सभी फ़ेच तुरंत शुरू कर रहे हैं, लेकिन उनमें से केवल 20 सेमाफोर प्राप्त करने के लिए सक्षम हो जाएगा। अन्य पहले async with निर्देश पर अवरुद्ध होंगे और एक और fetch पूरा होने तक प्रतीक्षा करें।

मैंने आधिकारिक एसिंसिओ समकक्ष के साथ aiohttp.Timeout को भी बदल दिया है।

अंत में, डेटा की वास्तविक प्रसंस्करण के लिए, यदि आप CPU समय द्वारा सीमित हैं, asyncio शायद आप ज्यादा मदद नहीं करेगा। वास्तविक CPU को किसी अन्य CPU पर समानांतर करने के लिए आपको ProcessPoolExecutor का उपयोग करने की आवश्यकता होगी। run_in_executor शायद इसका उपयोग किया जाएगा।

+0

धन्यवाद, मैं जो कुछ कहा है, उसे समझता हूं लेकिन आपने मुझे ProcessPoolExecutor भाग में खो दिया है। मुझे परिणामों की एक अलग सीपीयू प्रक्रिया की आवश्यकता है? मैं यह कैसे करु? और मैं उन्हें क्रम में कैसे संसाधित करूं, या क्या मुझे ऐसे फ़ंक्शन की आवश्यकता है जो सभी परिणामों को संसाधित करे चाहे कोई फर्क नहीं पड़ता? – AlexW

2

यहां concurrent.futures.ProcessPoolExecutor के साथ एक उदाहरण दिया गया है। यदि यह max_workers निर्दिष्ट किए बिना बनाया गया है, तो कार्यान्वयन इसके बजाय os.cpu_count का उपयोग करेगा। यह भी ध्यान रखें कि asyncio.wrap_future सार्वजनिक है लेकिन अनियंत्रित है। वैकल्पिक रूप से, AbstractEventLoop.run_in_executor है।

import asyncio 
from concurrent.futures import ProcessPoolExecutor 

import aiohttp 
import lxml.html 


def process_page(html): 
    '''Meant for CPU-bound workload''' 
    tree = lxml.html.fromstring(html) 
    return tree.find('.//title').text 


async def fetch_page(url, session): 
    '''Meant for IO-bound workload''' 
    async with session.get(url, timeout = 15) as res: 
     return await res.text() 


async def process(url, session, pool): 
    html = await fetch_page(url, session) 
    return await asyncio.wrap_future(pool.submit(process_page, html)) 


async def dispatch(urls): 
    pool = ProcessPoolExecutor() 
    async with aiohttp.ClientSession() as session: 
     coros = (process(url, session, pool) for url in urls) 
     return await asyncio.gather(*coros) 


def main(): 
    urls = [ 
     'https://stackoverflow.com/', 
     'https://serverfault.com/', 
     'https://askubuntu.com/', 
     'https://unix.stackexchange.com/' 
    ] 
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls)) 
    print(result) 

if __name__ == '__main__': 
    main() 
संबंधित मुद्दे