2008-12-16 3 views
17

में बैंडविड्थ थ्रॉटलिंग मैं एक प्रोग्राम विकसित कर रहा हूं जो लगातार पृष्ठभूमि में डेटा की धारा भेजता है और मैं उपयोगकर्ता को अपलोड और डाउनलोड सीमा दोनों के लिए टोपी सेट करने की अनुमति देना चाहता हूं।सी #

मैं token bucket और leaky bucket alghorhithms पर पढ़ लिया है, और उचित रूप में बाद के बाद से इस नेटवर्क बैंडविड्थ को अधिकतम बल्कि यथासंभव अवरोध रहित होने का मामला नहीं है वर्णन फिट करने के लिए लगता है।

हालांकि मैं इस पर कुछ अनिश्चित हूं कि मैं इसे कैसे कार्यान्वित करूंगा। एक प्राकृतिक दृष्टिकोण मौजूदा ट्रैफिक को विस्तारित करना आसान बनाने के लिए अमूर्त स्ट्रीम क्लास का विस्तार करना है, लेकिन इसके साथ-साथ डेटा भेजने के लिए अतिरिक्त धागे की भागीदारी की आवश्यकता नहीं होती है जबकि एक साथ (लीकी बाल्टी) प्राप्त होती है? अन्य कार्यान्वयन पर किसी भी संकेत की सराहना की जाएगी।

इसके अलावा, हालांकि मैं संशोधित कर सकता हूं कि प्रोग्राम कितना डेटा प्राप्त करता है, सी # स्तर पर बैंडविड्थ थ्रॉटलिंग कितनी अच्छी तरह से काम करता है? क्या कंप्यूटर अभी भी डेटा प्राप्त करेगा और इसे आसानी से थ्रॉटलिंग प्रभाव को रद्द कर देगा, या जब तक मैं और अधिक प्राप्त करने के लिए कहूं तब तक इंतजार करूंगा?

संपादित करें: मुझे इनकमिंग और आउटगोइंग डेटा दोनों को थ्रॉटल करने में दिलचस्पी है, जहां स्ट्रीम के विपरीत छोर पर मेरा कोई नियंत्रण नहीं है।

उत्तर

1

मैं अरुल द्वारा वर्णित थ्रॉटलडस्ट्रीम-क्लास के एक अलग कार्यान्वयन के साथ आया था। मेरे संस्करण एक 1s अंतराल के साथ एक WaitHandle और एक टाइमर का उपयोग करता है: जब तक अगले दूसरा शुरू होता है

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
{ 
    MaxBytesPerSecond = maxBytesPerSecond; 
    parent = parentStream; 
    processed = 0; 
    resettimer = new System.Timers.Timer(); 
    resettimer.Interval = 1000; 
    resettimer.Elapsed += resettimer_Elapsed; 
    resettimer.Start();   
} 

protected void Throttle(int bytes) 
{ 
    try 
    { 
     processed += bytes; 
     if (processed >= maxBytesPerSecond) 
      wh.WaitOne(); 
    } 
    catch 
    { 
    } 
} 

private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    processed = 0; 
    wh.Set(); 
} 

जब भी बैंडविड्थ सीमा थ्रेड से अधिक है सो जाएगा। इष्टतम नींद की अवधि की गणना करने की कोई आवश्यकता नहीं है।

पूर्ण कार्यान्वयन:

public class ThrottledStream : Stream 
{ 
    private readonly Stream parent; 
    private readonly int maxBytesPerSecond; 
    private readonly IScheduler scheduler; 
    private readonly IStopwatch stopwatch; 

    private long processed; 

    public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler) 
    { 
     this.maxBytesPerSecond = maxBytesPerSecond; 
     this.parent = parent; 
     this.scheduler = scheduler; 
     stopwatch = scheduler.StartStopwatch(); 
     processed = 0; 
    } 

    public ThrottledStream(Stream parent, int maxBytesPerSecond) 
     : this (parent, maxBytesPerSecond, Scheduler.Immediate) 
    { 
    } 

    protected void Throttle(int bytes) 
    { 
     processed += bytes; 
     var targetTime = TimeSpan.FromSeconds((double)processed/maxBytesPerSecond); 
     var actualTime = stopwatch.Elapsed; 
     var sleep = targetTime - actualTime; 
     if (sleep > TimeSpan.Zero) 
     { 
      using (var waitHandle = new AutoResetEvent(initialState: false)) 
      { 
       scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set()); 
       waitHandle.WaitOne(); 
      } 
     } 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var read = parent.Read(buffer, offset, count); 
     Throttle(read); 
     return read; 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 
} 

और कुछ परीक्षण है कि बस कुछ मिलीसेकेंड ले:

public class ThrottledStream : Stream 
{ 
    #region Properties 

    private int maxBytesPerSecond; 
    /// <summary> 
    /// Number of Bytes that are allowed per second 
    /// </summary> 
    public int MaxBytesPerSecond 
    { 
     get { return maxBytesPerSecond; } 
     set 
     { 
      if (value < 1) 
       throw new ArgumentException("MaxBytesPerSecond has to be >0"); 

      maxBytesPerSecond = value; 
     } 
    } 

    #endregion 


    #region Private Members 

    private int processed; 
    System.Timers.Timer resettimer; 
    AutoResetEvent wh = new AutoResetEvent(true); 
    private Stream parent; 

    #endregion 

    /// <summary> 
    /// Creates a new Stream with Databandwith cap 
    /// </summary> 
    /// <param name="parentStream"></param> 
    /// <param name="maxBytesPerSecond"></param> 
    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    { 
     MaxBytesPerSecond = maxBytesPerSecond; 
     parent = parentStream; 
     processed = 0; 
     resettimer = new System.Timers.Timer(); 
     resettimer.Interval = 1000; 
     resettimer.Elapsed += resettimer_Elapsed; 
     resettimer.Start();   
    } 

    protected void Throttle(int bytes) 
    { 
     try 
     { 
      processed += bytes; 
      if (processed >= maxBytesPerSecond) 
       wh.WaitOne(); 
     } 
     catch 
     { 
     } 
    } 

    private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
    { 
     processed = 0; 
     wh.Set(); 
    } 

    #region Stream-Overrides 

    public override void Close() 
    { 
     resettimer.Stop(); 
     resettimer.Close(); 
     base.Close(); 
    } 
    protected override void Dispose(bool disposing) 
    { 
     resettimer.Dispose(); 
     base.Dispose(disposing); 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     return parent.Read(buffer, offset, count); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 

    #endregion 


} 
+0

यदि आप टाइमर टिकते समय 'संसाधित' से '0' सेट नहीं करते हैं तो यह अधिक सटीक हो जाएगा, लेकिन इससे 'maxBytesPerSecond' घटाएं। –

1

@ 0xDEADBEEF के समाधान के आधार पर मैं आरएक्स शेड्यूलर के आधार पर निम्न (परीक्षण योग्य) समाधान बनाया

[TestMethod] 
public void ShouldThrottleReading() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new ThrottledStream(new MemoryStream(content), content.Length/8, scheduler); 
    var target = new MemoryStream(); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 

[TestMethod] 
public void ShouldThrottleWriting() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new MemoryStream(content); 
    var target = new ThrottledStream(new MemoryStream(), content.Length/8, scheduler); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
}