2012-04-16 16 views
9

मैं ऐसे एप्लिकेशन पर काम कर रहा हूं जिसका वर्कफ़्लो एसक्यूएस में संदेश पास करके प्रबंधित किया जाता है, बोटो का उपयोग करके।पाइथन में बोटो लाइब्रेरी का उपयोग कर अमेज़ॅन एसक्यूएस कतार में सभी संदेश कैसे प्राप्त करें?

मेरी एसक्यूएस कतार धीरे-धीरे बढ़ रही है, और मेरे पास यह जांचने का कोई तरीका नहीं है कि इसमें कितने तत्व शामिल हैं।

अब मेरे पास एक डिमन है जो समय-समय पर कतार का चुनाव करता है, और जांच करता है कि मेरे पास तत्वों का निश्चित आकार सेट है या नहीं। उदाहरण के लिए, निम्नलिखित "कतार" पर विचार करें:

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

अब मैं अगर मैं "msg1_comp1", एक साथ समय में कुछ बिंदु पर "msg2_comp1" और कतार में "msg3_comp1" है जाँच करना चाहते हैं, लेकिन मैं डॉन ' कतार के आकार को नहीं पता।

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

एक सुझाव जवाब में प्रस्तावित किया जाएगा:

एपीआई के माध्यम से देखने के बाद, यह आप या तो केवल 1 तत्व या कतार में तत्वों की एक निश्चित संख्या है, लेकिन सभी नहीं मिल सकता है लगता है उदाहरण के लिए एक लूप में 10 संदेश प्राप्त करें जब तक कि मुझे कुछ भी वापस न मिले, लेकिन एसक्यूएस में संदेशों में दृश्यता समय समाप्ति होती है, जिसका अर्थ है कि यदि मैं कतार से तत्वों को पोल करता हूं, तो उन्हें वास्तव में हटाया नहीं जाएगा, वे केवल एक छोटी अवधि के लिए अदृश्य होंगे समय की।

क्या कतार में सभी संदेश प्राप्त करने का कोई आसान तरीका है, यह जानने के बिना कि कितने हैं?

उत्तर

13

जबकि पाश अंदर q.get_messages(n) करने के लिए अपने कॉल रखो:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

साथ ही, dump won't support more than 10 messages या तो:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

मैं वास्तव में है कि, के बाद से SQS में संदेश एक दृश्यता समय समाप्ति है, इसलिए यदि मैं पहली बार नहीं कर सकते हैं 10 संदेश, फिर कुछ बार लूप करें, अगली बार जब मुझे टाइमआउट पास हो गया है तो मुझे वही 10 संदेश मिलेंगे। मैं 'डंप()' का उपयोग करने के बारे में सोच रहा हूं लेकिन मुझे फ़ाइल को पढ़ने के बाद, मुझे मूर्खतापूर्ण लगता है, क्या मुझे कुछ याद आ रहा है? (मैं दृश्यता_टाउटआउट को बहुत लंबे समय तक सेट कर सकता हूं, लेकिन यह बदसूरत लगता है)। –

+0

@ लिंकर - आपने कहा कि आपको 'n' विशिष्ट संदेशों की जांच करने की आवश्यकता है। क्या इसका मतलब यह है कि कुछ मिलान मानदंड हैं जिनके लिए आप प्रत्येक संदेश की तुलना कर रहे हैं? –

+0

क्षमा करें अगर वह भ्रमित था, मैंने अपनी पोस्ट अपडेट की है। –

5

मेरे समझ के साथ कि SQS सेवा की वितरित प्रकृति काफी अपने डिजाइन असाध्य बना देता है। हर बार जब आप get_messages को कॉल करते हैं तो आप सर्वर के एक अलग सेट से बात कर रहे हैं, जिसमें कुछ आपके संदेश नहीं होंगे। इस प्रकार सेट करने के लिए समय-समय पर जांचना संभव नहीं है कि संदेश का एक विशेष समूह तैयार है या नहीं, और फिर बस उन्हें स्वीकार करें।

आपको जो करना है वह लगातार मतदान है, सभी संदेशों को उनके आने पर ले जाएं, और उन्हें अपने डेटा संरचनाओं में स्थानीय रूप से स्टोर करें। प्रत्येक सफल fetch के बाद आप यह देखने के लिए अपने डेटा संरचनाओं की जांच कर सकते हैं कि संदेश का एक पूरा सेट एकत्र किया गया है या नहीं।

ध्यान रखें कि संदेशों आदेश से बाहर आ जाएगा रखें, और कुछ संदेशों , दो बार के रूप में वितरित किया जाएगा हटाए गए सभी SQS सर्वरों के लिए प्रचार करने के लिए है, लेकिन बाद में अनुरोध प्राप्त कभी कभी नष्ट संदेशों को हराया।

0

नीचे दिए गए कोड की तरह कुछ चाल चलाना चाहिए। क्षमा करें यह सी # में है, लेकिन इसे पायथन में कनवर्ट करना मुश्किल नहीं होना चाहिए। डिक्शनरी को बुझाने के लिए शब्दकोश का उपयोग किया जाता है।

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

मैं एडब्ल्यूएस SQS कतारों के साथ काम कर रहा है तत्काल सूचना उपलब्ध कराने के लिए है, इसलिए मैं वास्तविक समय में संदेशों के सभी प्रसंस्करण की जरूरत है। निम्नलिखित कोड आपको कुशलतापूर्वक (सभी) संदेशों को हटाने और हटाने के दौरान किसी भी त्रुटि को संभालने में मदद करेगा।

नोट: कतार से संदेशों को हटाने के लिए आपको उन्हें हटाने की आवश्यकता है।मैं अद्यतन boto3 एडब्ल्यूएस अजगर एसडीके, json पुस्तकालय का उपयोग कर रहा है, और निम्न मूलभूत मूल्यों:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

'बीटो 3' से 'delete_messages' फ़ंक्शन को" बैकपोर्ट "करने के लिए v2' Boto' पैकेज के लिए एक अनुकूलन [यहां] है [http://stackoverflow.com/a/40638174/4228193)। बिल्ट-इन 'बोटो' (2) 'delete_message_batch' में 10 संदेशों की सीमा है और किसी ऑब्जेक्ट में केवल' आईडी' और 'रसीद हैंडल्स' की बजाय पूर्ण 'संदेश'-वर्ग ऑब्जेक्ट्स की आवश्यकता होती है। – mpag

0

नोट: इस सवाल के लिए एक सीधा जवाब के रूप में इरादा नहीं है। इसके बजाय यह @TimothyLiu's answer पर एक संवर्धन है, मानते हैं कि अंतिम उपयोगकर्ता Boto पैकेज (उर्फ बोटो 2) Boto3 का उपयोग नहीं कर रहा है। इस कोड को एक delete_messages कॉल करने के लिए भेजा के "Boto-2-ization" his answer


में है एक Boto (2) delete_message_batch(messages_to_delete) के लिए कॉल जहां messages_to_delete कुंजी के साथ एक dict वस्तु है: मान इसी id रहे हैं: receipt_handle जोड़े रिटर्न

AttributeError: 'dict' object has no attribute 'id'.

ऐसा लगता है कि delete_message_batchMessage क्लास ऑब्जेक्ट की अपेक्षा करता है; Boto source for delete_message_batch की प्रतिलिपि बनाने और इसे एक गैर-Message ऑब्जेक्ट (एला boto3) का उपयोग करने की इजाजत देता है, यदि आप एक समय में 10 से अधिक "संदेश" हटा रहे हैं तो भी विफल हो जाता है। तो, मुझे निम्नलिखित कार्य-आसपास का उपयोग करना पड़ा। से here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

ePrint कोड मैं एक cronjob में इस पर अमल

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
संबंधित मुद्दे