2009-02-07 5 views
6

नोट: मुझे इस प्रश्न की लंबाई के लिए माफी माँगने दो, मुझे इसमें बहुत सारी जानकारी डालना पड़ा। मुझे आशा है कि बहुत से लोगों को बस इसे स्किम करने और धारणाएं करने का कारण नहीं बनता है। कृपया पूरी तरह से पढ़ें। धन्यवाद।लाइन आधारित नेटवर्क I/O धाराओं को संभालने के लिए एक अच्छी विधि क्या है?

मेरे पास सॉकेट में आने वाले डेटा की एक स्ट्रीम है। यह डेटा लाइन उन्मुख है।

मैं .NET (BeginRead, आदि ..) के एपीएम (Async प्रोग्रामिंग विधि) का उपयोग कर रहा हूँ। यह स्ट्रीम आधारित I/O का उपयोग करना रोकता है क्योंकि Async I/O बफर आधारित है। डेटा को दोबारा लोड करना और इसे स्ट्रीम में भेजना संभव है, जैसे मेमोरी स्ट्रीम, लेकिन वहां भी समस्याएं हैं।

समस्या यह है कि मेरी इनपुट स्ट्रीम (जिसका मेरा कोई नियंत्रण नहीं है) मुझे कोई जानकारी नहीं देता है कि स्ट्रीम कितनी देर तक है। यह बस यह की तरह लग रही न्यू लाइन लाइनों की एक धारा है:

COMMAND\n 
...Unpredictable number of lines of data...\n 
END COMMAND\n 
....repeat.... 

तो, एपीएम का उपयोग कर, और तब से मैं कितनी देर तक किसी भी डेटा सेट हो जाएगा पता नहीं है, यह संभावना है कि डेटा के ब्लॉक बफर पार जाएगा सीमाओं को एकाधिक पढ़ने की आवश्यकता होती है, लेकिन उन एकाधिक पढ़ने से डेटा के कई ब्लॉक भी फैले होंगे।

उदाहरण:

Byte buffer[1024] = ".................blah\nThis is another l" 
[another read] 
        "ine\n.............................More Lines..." 

मेरी पहली सोचा एक StringBuilder उपयोग करने के लिए और बस एस.बी. के लिए बफर पंक्तियां जोड़ रहा था। यह कुछ हद तक काम करता है, लेकिन मुझे डेटा के ब्लॉक निकालने में मुश्किल होती है। मैंने न्यूलाइन डेटा पढ़ने के लिए स्ट्रिंग रीडर का उपयोग करने का प्रयास किया लेकिन यह जानने का कोई तरीका नहीं था कि आपको पूरी लाइन मिल रही है या नहीं, क्योंकि स्ट्रिंग रीडर अंतिम ब्लॉक के अंत में आंशिक रेखा देता है, इसके बाद नल आफ्टरवार्ड लौटता है। यह जानने का कोई तरीका नहीं है कि क्या लौटाया गया था डेटा की एक पूर्ण रूपरेखा रेखा थी।

उदाहरण:

// Note: no newline at the end 
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp.."); 
StringReader sr = new StringReader(sb); 
string s = sr.ReadLine(); // returns "This is a line" 
s = sr.ReadLine();  // returns "This is incomp.." 

क्या बदतर है, कि अगर मैं सिर्फ आंकड़ों के जोड़कर रखने के लिए, बफ़र्स बड़ा और बड़ा मिलता है, और इस के बाद से एक बार हुआ है कि एक अच्छा नहीं है पर हफ्तों या महीनों के लिए चला सकते है उपाय।

मेरा अगला विचार एसबी से डेटा के ब्लॉक को हटाने के लिए था जैसा कि मैंने उन्हें पढ़ा था। इसके लिए अपना खुद का रीडलाइन फ़ंक्शन लिखना आवश्यक था, लेकिन फिर मैं पढ़ने और लिखने के दौरान डेटा लॉक कर रहा हूं। साथ ही, डेटा के बड़े ब्लॉक (जिसमें सैकड़ों रीड और डेटा के मेगाबाइट शामिल हो सकते हैं) को पूरे बफर को स्कैनिंग की आवश्यकता होती है जो न्यूलाइन की तलाश में होती है। यह कुशल और सुंदर बदसूरत नहीं है।

मैं कुछ ऐसा ढूंढ रहा हूं जिसमें StreamReader/Writer की सादगी है Iync I/O की सुविधा के साथ।

मेरा अगला विचार मेमोरीस्ट्रीम का उपयोग करना था, और डेटा के ब्लॉक को मेमोरी स्ट्रीम में लिखना था, फिर स्ट्रीम स्ट्रीमर को स्ट्रीम में संलग्न करें और रीडलाइन का उपयोग करें, लेकिन फिर मुझे यह जानने के साथ समस्याएं हैं कि बफर में आखिरी पढ़ना है या नहीं एक पूर्ण रेखा या नहीं, साथ ही स्ट्रीम से "बासी" डेटा को हटाने के लिए भी मुश्किल है।

मैंने सिंक्रोनस रीड के साथ थ्रेड का उपयोग करने के बारे में भी सोचा। इसका लाभ यह है कि StreamReader का उपयोग करके, यह हमेशा टूटी हुई कनेक्शन स्थितियों को छोड़कर रीडलाइन() से एक पूर्ण रेखा वापस कर देगा। हालांकि इसमें कनेक्शन को रद्द करने के साथ समस्याएं हैं, और कुछ प्रकार की नेटवर्क समस्याओं के परिणामस्वरूप विस्तारित अवधि के लिए अवरुद्ध सॉकेट को लटका दिया जा सकता है। मैं async IO का उपयोग कर रहा हूं क्योंकि मैं डेटा प्राप्त करने वाले प्रोग्राम के जीवन के लिए धागा बांधना नहीं चाहता हूं।

कनेक्शन लंबे समय तक चल रहा है।और समय के साथ डेटा प्रवाह जारी रहेगा। आंशिक कनेक्शन के दौरान, डेटा का एक बड़ा प्रवाह होता है, और एक बार प्रवाह होने के बाद सॉकेट वास्तविक समय के अपडेट के लिए खुला रहता है। मुझे पता नहीं है कि प्रारंभिक प्रवाह "समाप्त" हो गया है, क्योंकि यह जानने का एकमात्र तरीका यह है कि तुरंत कोई डेटा नहीं भेजा जाता है। इसका मतलब है कि मैं प्रसंस्करण से पहले प्रारंभिक डेटा लोड को समाप्त करने की प्रतीक्षा नहीं कर सकता, मैं अंदर आने के रूप में "वास्तविक समय में" बहुत अधिक अटक गया हूं।

तो, क्या कोई इस स्थिति को संभालने के लिए एक अच्छी विधि सुझा सकता है एक तरह से जो अत्यधिक जटिल नहीं है? मैं वास्तव में यह जितना संभव हो उतना सरल और सुरुचिपूर्ण होना चाहता हूं, लेकिन मैं सभी किनारे के मामलों के कारण अधिक से अधिक जटिल समाधानों के साथ आ रहा हूं। मुझे लगता है कि मैं जो कुछ चाहता हूं वह कुछ प्रकार का फीफो है जिसमें मैं आसानी से अधिक डेटा जोड़ना जारी रख सकता हूं, जबकि एक ही समय में डेटा को पॉप आउट करना जो कुछ मानदंडों से मेल खाता है (यानी, न्यूलाइन समाप्त स्ट्रिंग्स)।

+0

मैंने सोचा कि यह भी एक दिलचस्प समस्या थी, इसलिए मैंने सीसीआर के साथ इसे हल करने के बारे में एक पोस्ट लिखा जो आप http: //iodyner.spaces.live.com, यदि आप रुचि रखते हैं ... –

उत्तर

5

यह काफी दिलचस्प सवाल है। अतीत में मेरे लिए समाधान एक अलग थ्रेड का उपयोग सिंक्रोनस ऑपरेशंस के साथ करना है, जैसा कि आप प्रस्तावित करते हैं। (मैं ताले और अपवाद हैंडलर के बहुत से उपयोग करके सॉकेट को अवरुद्ध करने में सबसे अधिक समस्याओं को प्राप्त करने में कामयाब रहा।) फिर भी, इन-बिल्ट एसिंक्रोनस ऑपरेशंस का उपयोग आमतौर पर सलाह दी जाती है क्योंकि यह वास्तविक ओएस-स्तरीय एसिंक I/O के लिए अनुमति देता है, इसलिए मैं समझता हूं तुम्हारी बात।

ठीक है, मैं जो कुछ भी आपको विश्वास करता हूं उसे पूरा करने के लिए एक कक्षा में गया हूं और लिखा है (अपेक्षाकृत साफ तरीके से मैं कहूंगा)। आप क्या सोचते हैं मुझे बताओ।

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Text; 

public class AsyncStreamProcessor : IDisposable 
{ 
    protected StringBuilder _buffer; // Buffer for unprocessed data. 

    private bool _isDisposed = false; // True if object has been disposed 

    public AsyncStreamProcessor() 
    { 
     _buffer = null; 
    } 

    public IEnumerable<string> Process(byte[] newData) 
    { 
     // Note: replace the following encoding method with whatever you are reading. 
     // The trick here is to add an extra line break to the new data so that the algorithm recognises 
     // a single line break at the end of the new data. 
     using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine)) 
     { 
      // Read all lines from new data, returning all but the last. 
      // The last line is guaranteed to be incomplete (or possibly complete except for the line break, 
      // which will be processed with the next packet of data). 
      string line, prevLine = null; 
      while ((line = newDataReader.ReadLine()) != null) 
      { 
       if (prevLine != null) 
       { 
        yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine; 
        _buffer = null; 
       } 
       prevLine = line; 
      } 

      // Store last incomplete line in buffer. 
      if (_buffer == null) 
       // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
       // so that the buffer does not have to be expanded in most/all situations. 
       // Change it to whatever seems appropiate. 
       _buffer = new StringBuilder(prevLine, prevLine.Length * 2); 
      else 
       _buffer.Append(prevLine); 
     } 
    } 

    public void Dispose() 
    { 
     Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    private void Dispose(bool disposing) 
    { 
     if (!_isDisposed) 
     { 
      if (disposing) 
      { 
       // Dispose managed resources. 
       _buffer = null; 
       GC.Collect(); 
      } 

      // Dispose native resources. 

      // Remember that object has been disposed. 
      _isDisposed = true; 
     } 
    } 
} 

इस वर्ग का एक उदाहरण प्रत्येक NetworkStream के लिए बनाया जाना चाहिए और जब भी नए डेटा प्राप्त होता है प्रक्रिया समारोह बुलाया जाना चाहिए (BeginRead के लिए कॉलबैक विधि में, इससे पहले कि आप अगले BeginRead मैं कल्पना कर सकते हैं कहते हैं)।

नोट: मैंने केवल इस डेटा को परीक्षण डेटा के साथ सत्यापित किया है, न कि नेटवर्क पर प्रसारित वास्तविक डेटा। हालांकि, मैं किसी भी मतभेद की उम्मीद नहीं करता ...

इसके अलावा, एक चेतावनी है कि कक्षा निश्चित रूप से थ्रेड-सुरक्षित नहीं है, लेकिन जब तक वर्तमान डेटा संसाधित होने तक BeginRead को फिर से निष्पादित नहीं किया जाता है (जैसा कि मुझे लगता है कि आप कर रहे हैं), कोई समस्या नहीं होनी चाहिए।

आशा है कि यह आपके लिए काम करता है। यदि मुझे शेष समस्याएं हैं तो मुझे बताएं और मैं उनके साथ निपटने के लिए समाधान को संशोधित करने का प्रयास करूंगा। (ध्यान से पढ़ने के बावजूद, मैंने जो प्रश्न छोड़ा था, उसके बारे में कुछ सूक्ष्मता हो सकती है!)

+0

यह एक दिलचस्प समाधान है। मैंने भी इटरेटर्स को उपयोगी पाया है, लेकिन यह कोई समाधान नहीं था जिसका मेरा दिमाग आया होगा। मुझें यह पसंद है। –

+1

क्या आप समझा सकते हैं कि आपको IDISpose को लागू करने की आवश्यकता क्यों है? मुझे बताया गया है कि जीसी कॉल करना() बुरी आदत है और इसके परिणामस्वरूप खराब प्रदर्शन हो सकता है। क्या आप ढेर को थकाते हुए थोड़े समय के भीतर तेजी से आवंटन के बारे में चिंतित हैं? –

+0

हाँ, इटेटरेटर आसान चीजें हैं। इस मामले में आप इसे सामान्य सूची के साथ भी कर सकते हैं, हालांकि यह निश्चित रूप से बहुत अच्छा नहीं लग सकता है। यदि आप परिणाम/सूची के रूप में परिणाम से निपटना चाहते हैं, तो वैसे भी उन प्रकारों में कनवर्ट करना मुश्किल है, और कार्यान्वयन अभी भी आसान है। – Noldorin

0

जो आप समझा रहे हैं वह प्रश्न हैं, मुझे एएससीआईजेड तारों की बहुत याद दिलाता है। (link text)। यह एक सहायक शुरुआत हो सकती है।

मुझे उस परियोजना के लिए कॉलेज में ऐसा कुछ लिखना पड़ा जिस पर मैं काम कर रहा था। दुर्भाग्यवश, मेरे पास भेजने वाली सॉकेट पर नियंत्रण था, इसलिए मैंने प्रोटोकॉल के हिस्से के रूप में संदेश फ़ील्ड की लंबाई डाली। हालांकि, मुझे लगता है कि एक समान दृष्टिकोण आपको लाभ पहुंचा सकता है।

मैंने अपने समाधान से कैसे संपर्क किया था, मैं 5HELLO की तरह कुछ भेजूंगा, इसलिए पहले मैं 5 देखूंगा, और मुझे पता है कि मेरे पास संदेश की लंबाई 5 थी, और इसके लिए मुझे आवश्यक संदेश 5 वर्ण थे। हालांकि, अगर मेरे एसिंक पढ़ने पर, मुझे केवल 5 एचई मिल गया, मैं देखता हूं कि मेरे पास संदेश की लंबाई 5 है, लेकिन मैं केवल तार से 3 बाइट पढ़ने में सक्षम था (आइए ASCII वर्णों को मानें)। इस वजह से, मुझे पता था कि मैं कुछ बाइट्स खो रहा था, और जो टुकड़ा बफर में था उसे संग्रहीत किया। मेरे पास प्रति सॉकेट एक टुकड़ा बफर था, जिससे सिंक्रनाइज़ेशन समस्याओं से बचने के लिए। मोटा प्रक्रिया है।

  1. एक बाइट सरणी, रिकॉर्ड कितने बाइट्स
  2. बाइट द्वारा बाइट के माध्यम से स्कैन पढ़ रहा था, जब तक आप एक नई पंक्ति चरित्र को खोजने में सॉकेट से पढ़ें (यह बहुत ही जटिल हो जाता है अगर आप ASCII वर्ण नहीं ले पा रहे हैं, लेकिन वर्ण जो एकाधिक बाइट्स हो सकते हैं, आप उस पर हैं इसके लिए आप हैं)
  3. आप एक स्ट्रिंग में बग बफर हैं, और संलग्न करें कि आप इसे नई लाइन तक बफर पढ़ लें। इस स्ट्रिंग को एक कतार पर एक पूर्ण संदेश के रूप में या अपने स्वयं के प्रतिनिधि को संसाधित करने के लिए ड्रॉप करें। (आप वास्तव में इन बफर को अनुकूलित कर सकते हैं, क्योंकि आप एक ही बाइट सरणी में सॉकेट लेखन पढ़ रहे हैं क्योंकि आप टुकड़े हैं, लेकिन यह समझाना मुश्किल है)
  4. प्रत्येक बार जब हम एक नई लाइन पाते हैं, तो स्ट्रिंग बनाएं, स्ट्रिंग बनाएं बाइट से रिकॉर्ड की गई शुरुआत/अंत स्थिति से व्यवस्थित होता है और प्रसंस्करण के लिए कतार/प्रतिनिधि पर छोड़ देता है।
  5. एक बार जब हम अपने पढ़ने वाले बफर के अंत को दबा देते हैं, तो किसी भी चीज को कॉपी करें जो कि बग बफर में छोड़ा गया है।
  6. सॉकेट पर BeginRead को कॉल करें, जो चरण 1 पर कूद जाएगा। जब सॉकेट में डेटा उपलब्ध होगा।

फिर आप असामान्य संदेशों की कतार पढ़ने के लिए एक और थ्रेड का उपयोग करते हैं, या बस थ्रेडपूल प्रतिनिधियों का उपयोग करके इसे संभालने दें। और जो भी डेटा प्रोसेसिंग आपको करना है वह करें। अगर मैं गलत हूं तो कोई मुझे सही करेगा, लेकिन इसके साथ बहुत कम थ्रेड सिंक्रनाइज़ेशन समस्याएं हैं, क्योंकि आप केवल किसी भी समय सॉकेट से पढ़ने या पढ़ने का इंतजार कर रहे हैं, इसलिए ताले के बारे में कोई चिंता नहीं है (सिवाय इसके कि आप एक कतार populating, मैं अपने कार्यान्वयन में प्रतिनिधियों का इस्तेमाल किया)। कुछ विवरण हैं जिन्हें आपको स्वयं पर काम करने की आवश्यकता होगी, जैसे कि एक टुकड़ा बफर कितना बड़ा है, अगर आपको पढ़ने के दौरान 0 न्यूलाइन मिलती है, तो पूरे संदेश को ओवरराइटिंग के बिना खंड बफर में जोड़ा जाना चाहिए कुछ भी। मुझे लगता है कि यह अंत में कोड की लगभग 700 - 800 लाइनों में भाग गया, लेकिन इसमें कनेक्शन सेटअप सामान, एन्क्रिप्शन के लिए बातचीत, और कुछ अन्य चीजें शामिल थीं।

इस सेटअप ने मेरे लिए बहुत अच्छा प्रदर्शन किया; मैं इस कार्यान्वयन का उपयोग करके 100 एमबीपीएस ईथरनेट लैन पर 80 एमबीपीएस तक करने में सक्षम था जिसमें एन्क्रिप्शन प्रोसेसिंग सहित 1.8 गीगा ऑप्टरन शामिल था। और चूंकि आप सॉकेट से बंधे हैं, इसलिए सर्वर स्केल करेगा क्योंकि एक ही समय में एकाधिक सॉकेट काम किए जा सकते हैं। यदि आपको क्रम में संसाधित वस्तुओं की आवश्यकता है, तो आपको एक कतार का उपयोग करने की आवश्यकता होगी, लेकिन अगर आदेश कोई फर्क नहीं पड़ता है, तो प्रतिनिधि आपको थ्रेडपूल से बहुत स्केलेबल प्रदर्शन देंगे।

आशा है कि यह मदद करता है, पूर्ण समाधान होने के लिए नहीं, बल्कि एक दिशा जिसमें दिखना शुरू करना है।

* बस एक नोट, मेरा कार्यान्वयन पूरी तरह से बाइट स्तर पर नीचे था और एन्क्रिप्शन समर्थित था, मैंने अपने उदाहरण के लिए वर्णों का उपयोग करना आसान बनाने के लिए किया।

+0

हां, मैंने पहले से ही इस तरह के एक दृष्टिकोण को लागू किया है, लेकिन मुझे यह पसंद नहीं है। यह मेरे स्वाद के लिए बहुत गन्दा और जटिल है, इसलिए मैं यहां सुझावों के लिए पूछ रहा हूं। मुझे नोल्डोरिन के दृष्टिकोण पसंद हैं, मेरे पास मौजूदा फ्रेमवर्क कोड की इच्छा और पुन: उपयोग है। –

संबंधित मुद्दे

 संबंधित मुद्दे