2012-05-07 21 views
10

समस्या के लिए एडब्ल्यूएस S3 SDK का उपयोग: मैं उनके नेट SDK का उपयोग एडब्ल्यूएस S3 से समानांतर में 100 फ़ाइलों को डाउनलोड करना चाहते हैं। डाउनलोड की गई सामग्री 100 मेमोरी धाराओं में संग्रहीत की जानी चाहिए (फाइलें काफी छोटी हैं, और मैं इसे वहां से ले जा सकता हूं)। मैं कार्य, IAsyncResult, समांतर। *, और .NET 4.0 में अन्य विभिन्न दृष्टिकोणों के बीच भ्रमित हो रहा हूं।अमेज़न S3 से समानांतर बैच फ़ाइल डाउनलोड नेट

अगर मैं समस्या अपने आप हल करने के लिए, मेरे सिर के ऊपर से कोशिश मैं इस स्यूडोकोड की तरह कुछ कल्पना:

using Amazon; 
using Amazon.S3; 
using Amazon.S3.Model; 

AmazonS3 _s3 = ...; 
IEnumerable<GetObjectRequest> requestObjects = ...; 


// Prepare to launch requests 
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null); 

// Launch requests 
var asyncRequestsLaunched = asyncRequests.ToList(); 

// Prepare to finish requests 
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq); 

// Finish requests 
var actualResponses = responses.ToList(); 

// Fetch data 
var data = actualResponses.Select(rp => { 
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms; 
}); 

इस कोड को 100 अनुरोधों की शुरूआत (कुछ चर के प्रकार जोड़ने के लिए संपादित) समानांतर में, जो अच्छा है। हालांकि, दो समस्याएं हैं:

  1. अंतिम विवरण समानांतर में नहीं, फ़ाइलों को क्रमशः डाउनलोड करेगा। स्ट्रीम पर BeginCopyTo()/EndCopyTo() विधि प्रतीत नहीं होती है ...
  2. पिछला बयान तब तक नहीं जाने देगा जब तक कि सभी अनुरोधों ने जवाब नहीं दिया हो। दूसरे शब्दों में, जब तक कि वे सभी शुरू नहीं हो जाते तब तक कोई भी फाइल डाउनलोड करना शुरू नहीं होगा।

तो यहाँ मैं सोच रहा नीचे गलत पथ की ओर बढ़ रहा शुरू ...

मदद?

उत्तर

19

यह शायद आसान है अगर आप एक विधि है कि एक अनुरोध अतुल्यकालिक रूप से संभाल लेंगे और उसके बाद इसे कहते 100 बार में नीचे आपरेशन टूट गया।

प्रारंभ करने के लिए, आइए अंतिम परिणाम की पहचान करें। चूंकि आप किसके साथ काम करेंगे, MemoryStream का अर्थ है कि आप अपनी विधि से Task<MemoryStream> वापस करना चाहेंगे। हस्ताक्षर कुछ इस तरह दिखेगा:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 

क्योंकि आपके AmazonS3 वस्तु Asynchronous Design Pattern लागू करता है, तो आप FromAsync methodTaskFactory class पर उपयोग एक वर्ग है कि अतुल्यकालिक डिजाइन पैटर्न को लागू करता है, तो तरह से Task<T> उत्पन्न करने के लिए कर सकते हैं:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 
{ 
    Task<GetObjectResponse> response = 
     Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
      s3.BeginGetObject, s3.EndGetObject, request, null); 

    // But what goes here? 

तो तुम एक अच्छी जगह में पहले से ही कर रहे हैं, तो आप एक Task<T> जो आप पर प्रतीक्षा करने या जब कॉल पूरा करता है पर एक कॉलबैक प्राप्त कर सकते हैं। हालांकि, आपको किसी भी तरह से GetObjectResponse को कॉल से Task<GetObjectResponse> पर MemoryStream में वापस करने की आवश्यकता है।

इसके अंत में, आप Task<T> कक्षा पर ContinueWith method का उपयोग करना चाहते हैं। पर Select method के एसिंक्रोनस संस्करण के रूप में इसके बारे में सोचें, यह केवल Task<T> में एक प्रक्षेपण है, सिवाय इसके कि प्रत्येक बार जब आप ContinueWith पर कॉल करते हैं, तो आप संभावित रूप से एक नया कार्य बना रहे हैं जो को कोड का अनुभाग चलाता है।

कि के साथ, अपने विधि निम्नलिखित की तरह दिखता है:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 
{ 
    // Start the task of downloading. 
    Task<GetObjectResponse> response = 
     Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
      s3.BeginGetObject, s3.EndGetObject, request, null 
     ); 

    // Translate. 
    Task<MemoryStream> translation = response.ContinueWith(t => { 
     using (Task<GetObjectResponse> resp = t){ 
      var ms = new MemoryStream(); 
      t.Result.ResponseStream.CopyTo(ms); 
      return ms; 
     } 
    }); 

    // Return the full task chain. 
    return translation; 
} 

ध्यान दें कि आप इसके बाद के संस्करण संभवतः कॉल कर सकते हैं overload of ContinueWith गुजर TaskContinuationOptions.ExecuteSynchronously, के रूप में यह प्रतीत होता है कि आप कम से कम काम कर रहे (मैं नहीं बता सकता, प्रतिक्रिया विशाल हो सकती है)। उन मामलों में जहां आप बहुत कम काम कर रहे हैं, जहां काम पूरा करने के लिए एक नया कार्य शुरू करने के लिए हानिकारक होगा, आपको TaskContinuationOptions.ExecuteSynchronously पास करना चाहिए ताकि आप न्यूनतम संचालन के लिए नए कार्यों को बनाने में समय बर्बाद न करें।

अब आप विधि है कि एक Task<MemoryStream> में एक अनुरोध अनुवाद कर सकते हैं, एक आवरण है कि कार्रवाई करेंगे बनाने है उनमें से किसी संख्या सरल है:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests) 
{ 
    // Just call Select on the requests, passing our translation into 
    // a Task<MemoryStream>. 
    // Also, materialize here, so that the tasks are "hot" when 
    // returned. 
    return requests.Select(r => GetMemoryStreamAsync(s3, r)). 
     ToArray(); 
} 

ऊपर, आप बस ले आपके GetObjectRequest उदाहरणों का अनुक्रम और यह Task<MemoryStream> की एक सरणी वापस कर देगा। तथ्य यह है कि यह एक भौतिक अनुक्रम लौटाता है महत्वपूर्ण है। यदि आप लौटने से पहले इसे पूरा नहीं करते हैं, तो कार्य तब तक नहीं बनाए जाएंगे जब तक अनुक्रम को पुन: चालू नहीं किया जाता है।

बेशक, यदि आप यह व्यवहार चाहते हैं, तो हर तरह से, केवल .ToArray() पर कॉल को हटा दें, विधि IEnumerable<Task<MemoryStream>> पर वापस लौटें और फिर अनुरोधों के माध्यम से अनुरोध किए जाएंगे।

वहां से, आप एक समय में उन्हें एक प्रक्रिया कर सकते हैं (Task.WaitAny method लूप में उपयोग करके) या उन सभी को पूरा करने की प्रतीक्षा करें (Task.WaitAll method पर कॉल करके)। उत्तरार्द्ध का एक उदाहरण होगा:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests) 
{ 
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests); 
    Task.WaitAll(tasks); 
    return tasks.Select(t => t.Result).ToList(); 
} 

इसके अलावा, यह उल्लेख किया जाना चाहिए कि इस Reactive Extensions framework के लिए एक बहुत अच्छी फिट है इस बहुत एक IObservable<T> कार्यान्वयन की दिशा में अच्छी तरह से अनुकूल है।

+2

यह एक अच्छा समाधान है, जो आश्चर्यजनक रूप से अच्छी तरह वर्णित है, और प्रश्न पोस्ट करने के लगभग 20 मिनट बाद वितरित किया गया। मैं खुश हूं। यह मेरे लिए भी अच्छा काम करता है, मैंने फिक्स को अधिक सटीक S3 क्लास नाम जोड़ने के लिए, और अधिक विशिष्ट FromAsync() विधि निर्दिष्ट करें। कैस्पर, क्या आप चाहते हैं कि मैं आपके उत्तर में बदलावों में संपादित करूं? – DenNukem

+1

@DenNukem ओह, मैंने एक स्ट्रीम से दूसरे स्ट्रीम में असीमित रूप से प्रतिलिपि बनाने का पता नहीं लगाया। यह .NET 4.5 में उपलब्ध होगा, लेकिन इसे ट्रेन के मलबे की तरह दिखने के लिए कुछ 'async'/'await' goodness की आवश्यकता होगी। अभी के लिए, ['Stream.CopyTo' विधि] (http://msdn.microsoft.com/en-us/library/system.io.stream.copyto.aspx) का उपयोग करें, लेकिन पता है कि .NET 4.5 में आप इसका उपयोग कर सकते हैं ['Stream.CopyToAsync'] (http://msdn.microsoft.com/en-us/library/system.io.stream.copytoasync.aspx) के साथ * async' /' await' के साथ * सभी * करने के लिए अधिक elgantly। – casperOne

+0

@casperOne क्या मैं इसे करने के .NET 4.5 तरीके का उदाहरण देख सकता हूं? – user1265146

संबंधित मुद्दे