2010-11-24 12 views
13

मैं एक व्यापार मंच परियोजना के लिए आरएक्स का मूल्यांकन कर रहा हूं जिसे हजारों संदेशों को एक सेकंड में संसाधित करने की आवश्यकता होगी। मौजूदा प्लेटफ़ॉर्म में एक जटिल ईवेंट रूटिंग सिस्टम (मल्टीकास्ट प्रतिनिधि) है जो इन संदेशों का जवाब देता है और बाद में प्रसंस्करण करता है।प्रतिक्रियाशील एक्सटेंशन बहुत धीमे लगते हैं - क्या मैं कुछ गलत कर रहा हूं?

मैंने स्पष्ट लाभों के लिए प्रतिक्रियाशील एक्सटेंशन को देखा है लेकिन ध्यान दिया है कि यह कुछ हद तक धीमा, सामान्य 100 गुना धीमा है।

मैंने यह दिखाने के लिए इकाई परीक्षण बनाया है जो विभिन्न आरएक्स स्वादों और सीधे-बाहर-द-बॉक्स प्रतिनिधि "नियंत्रण" परीक्षण का उपयोग करके 1 मिलियन बार सरल वृद्धि करता है।

Delegate         - (1000000) - 00:00:00.0410000 
Observable.Range()      - (1000000) - 00:00:04.8760000 
Subject.Subscribe() - NewThread   - (1000000) - 00:00:02.7630000 
Subject.Subscribe() - CurrentThread  - (1000000) - 00:00:03.0280000 
Subject.Subscribe() - Immediate   - (1000000) - 00:00:03.0030000 
Subject.Subscribe() - ThreadPool   - (1000000) - 00:00:02.9800000 
Subject.Subscribe() - Dispatcher   - (1000000) - 00:00:03.0360000 

आप देख सकते हैं, सभी आरएक्स तरीकों ~ 100 बार एक प्रतिनिधि बराबर की तुलना में धीमी कर रहे हैं:

यहाँ के परिणाम हैं। स्पष्ट रूप से आरएक्स कवर के तहत बहुत कुछ कर रहा है जो अधिक जटिल उदाहरण में उपयोग किया जाएगा, लेकिन यह अविश्वसनीय रूप से धीमा लगता है।

क्या यह सामान्य है या मेरी परीक्षण धारणाएं अमान्य हैं? नीचे से ऊपर के लिए Nunit कोड -

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Text; 
using NUnit.Framework; 
using System.Concurrency; 

namespace RxTests 
{ 
    [TestFixture] 
    class ReactiveExtensionsBenchmark_Tests 
    { 
     private int counter = 0; 

     [Test] 
     public void ReactiveExtensionsPerformanceComparisons() 
     { 
      int iterations = 1000000; 

      Action<int> a = (i) => { counter++; }; 

      DelegateSmokeTest(iterations, a); 
      ObservableRangeTest(iterations, a); 
      SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread"); 
      SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread"); 
      SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate"); 
      SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool"); 
      SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher"); 
     } 

     public void ObservableRangeTest(int iterations, Action<int> action) 
     { 
      counter = 0; 

      long start = DateTime.Now.Ticks; 

      Observable.Range(0, iterations).Subscribe(action); 

      OutputTestDuration("Observable.Range()", start); 
     } 


     public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
     { 
      counter = 0; 

      var eventSubject = new Subject<int>(); 
      var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb 
      events.Subscribe(action); 

      long start = DateTime.Now.Ticks; 

      Enumerable.Range(0, iterations).ToList().ForEach 
       (
        a => eventSubject.OnNext(1) 
       ); 

      OutputTestDuration("Subject.Subscribe() - " + mode, start); 
     } 

     public void DelegateSmokeTest(int iterations, Action<int> action) 
     { 
      counter = 0; 
      long start = DateTime.Now.Ticks; 

      Enumerable.Range(0, iterations).ToList().ForEach 
       (
        a => action(1) 
       ); 

      OutputTestDuration("Delegate", start); 
     } 


     /// <summary> 
     /// Output helper 
     /// </summary> 
     /// <param name="test"></param> 
     /// <param name="duration"></param> 
     public void OutputTestDuration(string test, long duration) 
     { 
      Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration))); 
     } 

     /// <summary> 
     /// Test timing helper 
     /// </summary> 
     /// <param name="elapsedTicks"></param> 
     /// <returns></returns> 
     public string ElapsedDuration(long elapsedTicks) 
     { 
      return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString(); 
     } 

    } 
} 
+0

विषय सदस्यता सबस्टाइल 'शेड्यूलर' तर्क का उपयोग नहीं करता है, इसलिए मुझे आश्चर्य है कि आपको वास्तव में अलग-अलग परिणाम मिलते हैं। – dtb

+0

और आप सीधे पर्यवेक्षण में कार्रवाई की सदस्यता लेने के बजाय किसी विषय का उपयोग क्यों कर रहे हैं? अफैइक विषय कवर के तहत काफी कुछ करते हैं, इसलिए आपको यह जांचना चाहिए कि इसे हटाने से कोई फर्क पड़ता है या नहीं। – dtb

+0

मुझे आश्चर्य है कि ObservableRangeTest * वास्तव में * खराब प्रदर्शन करता है, यहां तक ​​कि विषय के साथ परीक्षणों की तुलना में भी। WTF? – dtb

उत्तर

16

मेरा अनुमान है कि आरएक्स टीम पहले कार्यक्षमता के निर्माण पर केंद्रित है और प्रदर्शन अनुकूलन के बारे में अभी तक की परवाह नहीं करता है।

बाधाओं को निर्धारित करने के लिए एक प्रोफाइलर का उपयोग करें और अपने स्वयं के अनुकूलित संस्करणों के साथ धीमी आरएक्स कक्षाओं को प्रतिस्थापित करें।

नीचे दो उदाहरण हैं।

परिणाम:

सभी की
 
Delegate         - (1000000) - 00:00:00.0368748 

Simple - NewThread      - (1000000) - 00:00:00.0207676 
Simple - CurrentThread     - (1000000) - 00:00:00.0214599 
Simple - Immediate      - (1000000) - 00:00:00.0162026 
Simple - ThreadPool      - (1000000) - 00:00:00.0169848 

FastSubject.Subscribe() - NewThread  - (1000000) - 00:00:00.0588149 
FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842 
FastSubject.Subscribe() - Immediate  - (1000000) - 00:00:00.0513911 
FastSubject.Subscribe() - ThreadPool  - (1000000) - 00:00:00.0529137 

सबसे पहले, यह एक बहुत कैसे नमूदार कार्यान्वित किया जाता है कोई फर्क लगता है। यहाँ एक नमूदार कि से छोड़ी नहीं जा सकती है, लेकिन यह तेजी से है:

private IObservable<int> CreateFastObservable(int iterations) 
{ 
    return Observable.Create<int>(observer => 
    { 
     new Thread(_ => 
     { 
      for (int i = 0; i < iterations; i++) 
      { 
       observer.OnNext(i); 
      } 
      observer.OnCompleted(); 
     }).Start(); 
     return() => { }; 
    }); 
} 

टेस्ट:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
{ 
    counter = 0; 

    var start = Stopwatch.StartNew(); 

    var observable = CreateFastObservable(iterations); 

    observable.SubscribeOn(scheduler).Run(action); 

    OutputTestDuration("Simple - " + mode, start); 
} 

विषयों भूमि के ऊपर का एक बहुत जोड़ें। यहाँ एक विषय है कि एक विषय से उम्मीद की कार्यक्षमता का ज्यादा से स्ट्रिप किया जाता है, लेकिन यह तेजी से है:

class FastSubject<T> : ISubject<T> 
{ 
    private event Action onCompleted; 
    private event Action<Exception> onError; 
    private event Action<T> onNext; 

    public FastSubject() 
    { 
     onCompleted +=() => { }; 
     onError += error => { }; 
     onNext += value => { }; 
    } 

    public void OnCompleted() 
    { 
     this.onCompleted(); 
    } 

    public void OnError(Exception error) 
    { 
     this.onError(error); 
    } 

    public void OnNext(T value) 
    { 
     this.onNext(value); 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     this.onCompleted += observer.OnCompleted; 
     this.onError += observer.OnError; 
     this.onNext += observer.OnNext; 

     return Disposable.Create(() => 
     { 
      this.onCompleted -= observer.OnCompleted; 
      this.onError -= observer.OnError; 
      this.onNext -= observer.OnNext; 
     }); 
    } 
} 

टेस्ट:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
{ 
    counter = 0; 

    var start = Stopwatch.StartNew(); 

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount(); 

    observable.SubscribeOn(scheduler).Run(action); 

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start); 
} 
+0

वाह। इस पर आपके समय के लिए बहुत बहुत धन्यवाद। –

+2

Observable.Range के पास पैरामीटर के रूप में शेड्यूलर के साथ एक अधिभार है, इसलिए, यह मेरे लिए 00: 00: 00.6698080 में शेड्यूलर.इमीडिएट का उपयोग करता है: सार्वजनिक शून्य सरल ओब्सर्वटेस्ट (int पुनरावृत्तियों, एक्शन एक्शन, आईएसड्यूलर शेड्यूलर, स्ट्रिंग मोड) {काउंटर = 0; var start = stopwatch.StartNew(); अवलोकन योग्य। रेंज (0, पुनरावृत्तियों, शेड्यूलर) .रुन (एक्शन); आउटपुटटेस्ट अवधि ("सरल -" + मोड, शुरू करें); } –

+0

@ रिचर्ड हेन: अच्छा लगता है। तो Observable.Range इस्तेमाल शेड्यूलर पर बहुत निर्भर करता है। यद्यपि यह अभी भी मेरे "फास्ट ऑब्सर्जेबल" से धीमा है। लेकिन यह जानने के लिए भुगतान करता है कि कौन से knobs बारी बारी से। – dtb

10

याद रखें कि आपके प्रतिनिधि किसी भी धागा सुरक्षा की गारंटी नहीं है - यह सचमुच प्रतिनिधियों को जो कुछ भी थ्रेड से बुलाया जाता है, उससे कॉल करता है, जबकि जब आप ऑब्जर्जेबल.ऑब्सर्वऑन को अन्य धागे पर मार्शल नोटिफिकेशन करने के लिए कहते हैं, तो आरएक्स.नेट को यह सुनिश्चित करने के लिए लॉक करना होगा कि ऐसा करता है जो आपको लगता है।

तो, प्रतिनिधि सुपर फास्ट को स्थानांतरित कर सकते हैं, लेकिन यदि आप इसका उपयोग करके कुछ व्यावहारिक बनाना चाहते हैं, तो आप हाथ से सिंक्रनाइज़ेशन के निर्माण को समाप्त कर देंगे जो आपको धीमा कर देगा। ऐसा कहा जा रहा है कि, LINQ की तरह आरएक्स एक अमूर्त है - यदि आपको हास्यास्पद रूप से तेज़ होने की आवश्यकता है, तो आपको बदसूरत कोड लिखना शुरू करना होगा।

+0

हाँ, कोड में धारणाओं के साथ समस्याओं का एहसास शुरू करना ;-) –

12

आरएक्स 2 के लिए अद्यतन।0: मैं के साथ मूल पद से कोड लिया (लगभग) नवीनतम LINQPad बीटा 4.42.04 (अच्छी तरह से वहाँ एक 06 है, लेकिन वैसे भी): Rx Main assemblies

... और यह थोड़ा नया आरएक्स v2 का उपयोग करने के लिए समायोजित अनुसूचक वाक्य रचना:

 public void ReactiveExtensionsPerformanceComparisons() 
    { 
     int iterations = 1000000; 

     Action<int> a = (i) => { counter++; }; 

     DelegateSmokeTest(iterations, a); 
     ObservableRangeTest(iterations, a); 
     SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread"); 
     SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread"); 
     SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate"); 
     SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool"); 
     // I *think* this is the same as the ThreadPool scheduler in my case 
     SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");     
     // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete 
     //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool"); 
    } 

नोट: परिणाम दुर्लभ मामलों ThreadPool धड़कता newThread में, बेतहाशा भिन्न है, लेकिन ज्यादातर मामलों में newThread एक मामूली बढ़त सूची में नीचे शेड्यूलर ऊपर है:

Delegate         - (1000000) - 00:00:00.0440025 
Observable.Range()      - (1000000) - 00:00:01.9251101 
Subject.Subscribe() - NewThread   - (1000000) - 00:00:00.0400023 
Subject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0530030 
Subject.Subscribe() - Immediate   - (1000000) - 00:00:00.0490028 
Subject.Subscribe() - ThreadPool   - (1000000) - 00:00:00.0490028 
Subject.Subscribe() - Default   - (1000000) - 00:00:00.0480028 

तो ऐसा लगता है कि उन्होंने प्रदर्शन पर बहुत मेहनत की है ..

+0

इस अद्यतन के लिए धन्यवाद! – film42

संबंधित मुद्दे