2015-12-06 13 views
7

मैं aiohttp उपयोग कर रहा हूँ इस तरह अजगर 3.4 में एक सरल HTTP अनुरोध बनाने के लिए कैश करने के लिए कैसे:asyncio coroutines

response = yield from aiohttp.get(url) 

आवेदन फिर से पर एक ही यूआरएल का अनुरोध करता है तो स्वाभाविक रूप से मैं इसे कैश करने के लिए करना चाहता था। मेरा पहला प्रयास कुछ इस तरह था:

@functools.lru_cache(maxsize=128) 
def cached_request(url): 
    return aiohttp.get(url) 

cached_request करने के लिए पहली कॉल ठीक काम करता है, लेकिन बाद में कॉल में मैं प्रतिक्रिया ऑब्जेक्ट के बजाय None साथ खत्म।

मैं एसिन्सीओ के लिए नया हूं इसलिए मैंने asyncio.coroutine सजावट, yield from और कुछ अन्य चीजों के कई संयोजनों की कोशिश की, लेकिन कोई भी काम नहीं कर रहा था।

तो कैशिंग कोरिंग कैसे काम करता है?

+0

सुनिश्चित नहीं हैं कि आप एक coroutine कैशिंग द्वारा क्या मतलब है? जैसे इसे एक चर के रूप में सहेजें ताकि आप इसे बार-बार कॉल कर सकें? परिणाम को तब तक सहेजें जब तक परिणाम बाद में निष्पादन पर प्रतिस्थापित नहीं किया जाता है? या बाद में एक ही कोरआउट दोहराया गया है? – shongololo

+0

@shongololo मैं coroutine के परिणाम कैश करना चाहता हूँ। – tobib

+1

मैं functools.lru_cache() से परिचित नहीं हूं लेकिन यदि आप बस अपडेट किए गए परिणामों को वापस करना चाहते हैं, तो क्या कोई कारण है कि आप अपडेट किए गए परिणामों को एक चर में सहेजते नहीं हैं? फिर भी, एक एसिंक्रोनस विधि (जैसे 'aiohttp.get()') का उपयोग करते समय आपको इसे किसी चीज़ से ड्राइव करना होगा। तो cached_request को '@ asyncio.coroutine' के साथ संलग्न किया जाना है; इसे 'सेल्ड' का उपयोग करके बुलाया जाना है; और रिटर्न स्टेटमेंट 'रिटर्न की लाइनों के साथ तैयार किया जाना चाहिए (aiohttp.get (url) से उपज) ' – shongololo

उत्तर

3

मैं एक साधारण कैश डेकोरेटर अपने आप को लिखा था:

def async_cache(maxsize=128): 
    cache = {} 

    def decorator(fn): 
     def wrapper(*args):               
      key = ':'.join(args) 

      if key not in cache: 
       if len(cache) >= maxsize: 
        del cache[cache.keys().next()] 

       cache[key] = yield from fn(*args) 

      return cache[key] 

     return wrapper 

    return decorator 


@async_cache() 
@asyncio.coroutine 
def expensive_io(): 
    .... 

इस तरह के- काम करता है। लेकिन कई पहलुओं में शायद सुधार किया जा सकता है। उदाहरण के लिए: यदि पहले कॉल रिटर्न से पहले कैश किए गए फ़ंक्शन को दूसरी बार कॉल किया जाता है, तो यह दूसरी बार निष्पादित होगा।

+0

सुझाव: एक [' ऑर्डरर्ड डिक्ट'] (https://docs.python.org/3/library/collections का उपयोग करें) .html # collections.OrderedDict) 'lru' व्यवहार को लागू करने के लिए, यानी प्रत्येक कुंजी पर' OrderedDict.move_to_end' का उपयोग करें, और फिर कैश पूर्ण होने पर 'OrderedDict.popitem' का उपयोग करें। –

0

मैं aiohttp से परिचित नहीं हूं इसलिए मुझे यकीन नहीं है कि क्या हो रहा है जिससे नोन्स वापस आ जाएंगे, लेकिन lru_cache सजावटी एसिंक फ़ंक्शन के साथ काम नहीं करेगा।

मैं एक सजावट का उपयोग करता हूं जो अनिवार्य रूप से वही काम करता है; ध्यान दें कि यह tobib के डेकोरेटर करने के लिए विभिन्न में ऊपर कि यह हमेशा एक भविष्य या एक कार्य वापस आ जाएगी, बल्कि मूल्य से है:

from collections import OrderedDict 
from functools import _make_key, wraps 

def future_lru_cache(maxsize=128): 
    # support use as decorator without calling, for this case maxsize will 
    # not be an int 
    try: 
     real_max_size = int(maxsize) 
    except ValueError: 
     real_max_size = 128 

    cache = OrderedDict() 

    async def run_and_cache(func, args, kwargs): 
     """Run func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     cache[_make_key(args, kwargs, False)] = result 
     if len(cache) > real_max_size: 
      cache.popitem(False) 
     return result 

    def wrapper(func): 
     @wraps(func) 
     def decorator(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       # Some protection against duplicating calls already in 
       # progress: when starting the call cache the future, and if 
       # the same thing is requested again return that future. 
       if isinstance(cache[key], asyncio.Future): 
        return cache[key] 
       else: 
        f = asyncio.Future() 
        f.set_result(cache[key]) 
        return f 
      else: 
       task = asyncio.Task(run_and_cache(func, args, kwargs)) 
       cache[key] = task 
       return task 
     return decorator 

    if callable(maxsize): 
     return wrapper(maxsize) 
    else: 
     return wrapper 

मैं functools से _make_key इस्तेमाल किया lru_cache करता है के रूप में, मुझे लगता है यह बहुत निजी होने की अपेक्षा की जाती है शायद इसे कॉपी करने के लिए बेहतर है।

0

LRU डेकोरेटर, जो अभी तक पूरी नहीं हुई coroutines, एक ही कुंजी के समानांतर अनुरोध के साथ बहुत उपयोगी कैश का एक और प्रकार:

import asyncio 
from collections import OrderedDict 
from functools import _make_key, wraps 

def async_cache(maxsize=128, event_loop=None): 
    cache = OrderedDict() 
    if event_loop is None: 
     event_loop = asyncio.get_event_loop() 
    awaiting = dict() 

    async def run_and_cache(func, args, kwargs): 
     """await func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     key = _make_key(args, kwargs, False) 
     cache[key] = result 
     if len(cache) > maxsize: 
      cache.popitem(False) 
     cache.move_to_end(key) 
     return result 

    def decorator(func): 
     @wraps(func) 
     async def wrapper(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       return cache[key] 
      if key in awaiting: 
       task = awaiting[key] 
       return await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop) 
      awaiting[key] = task 
      result = await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      del awaiting[key] 
      return result 
     return wrapper 

    return decorator 


async def test_async_cache(event_loop): 
    counter = 0 
    n, m = 10, 3 

    @async_cache(maxsize=n, event_loop=event_loop) 
    async def cached_function(x): 
     nonlocal counter 
     await asyncio.sleep(0) # making event loop switch to other coroutine 
     counter += 1 
     return x 

    tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop) 
      for x in list(range(n)) * m] 
    done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1) 
    assert len(done) == n * m 
    assert counter == n 

event_loop = asyncio.get_event_loop() 
task = asyncio.ensure_future(test_async_cache(event_loop)) 
event_loop.run_until_complete(task) 
2

हो सकता है कि थोड़ी देर हो चुकी है, लेकिन मैं मदद मिल सकती है एक नए पैकेज शुरू कर दिया है : https://github.com/argaen/aiocache। योगदान/टिप्पणियां हमेशा स्वागत है।

एक उदाहरण:

import asyncio 

from collections import namedtuple 

from aiocache import cached 
from aiocache.serializers import PickleSerializer 

Result = namedtuple('Result', "content, status") 


@cached(ttl=10, serializer=PickleSerializer()) 
async def async_main(): 
    print("First ASYNC non cached call...") 
    await asyncio.sleep(1) 
    return Result("content", 200) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 

नोट एक अतिरिक्त के रूप में, यह अचार क्रमबद्धता का उपयोग कर redis में किसी भी अजगर वस्तु को कैश कर सकते हैं। यदि आप बस स्मृति के साथ काम करना चाहते हैं, तो आप SimpleMemoryCache बैकएंड का उपयोग कर सकते हैं :)।

1

कोरआउट के साथ functools.lru_cache का उपयोग करने के लिए, निम्न कोड काम करता है।

class Cacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = asyncio.Lock() 

    def __await__(self): 
     with (yield from self.lock): 
      if self.done: 
       return self.result 
      self.result = yield from self.co.__await__() 
      self.done = True 
      return self.result 

def cacheable(f): 
    def wrapped(*args, **kwargs): 
     r = f(*args, **kwargs) 
     return Cacheable(r) 
    return wrapped 


@functools.lru_cache() 
@cacheable 
async def foo(): 
    async with aiohttp.ClientSession() as session: 
     async with session.get(url) as resp: 
      return await resp.text() 

सुरक्षित थ्रेड है निम्नलिखित

class ThreadSafeCacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = threading.Lock() 

    def __await__(self): 
     while True: 
      if self.done: 
       return self.result 
      if self.lock.acquire(blocking=False): 
       self.result = yield from self.co.__await__() 
       self.done = True 
       return self.result 
      else: 
       yield from asyncio.sleep(0.005) 
संबंधित मुद्दे