2014-12-19 10 views
6

मैं एक समांतर घटना ग्राहक बनाने की कोशिश कर रहा हूं। यह मेरा पहला प्रयास है:समांतर घटना ग्राहक .net 4.5

using System; 
using System.Collections.Generic; 
using System.Threading.Tasks; 
using EventStore.ClientAPI; 

namespace Sandbox 
{ 
    public class SomeEventSubscriber 
    { 
     private Position? _latestPosition; 
     private readonly Dictionary<Type, Action<object>> _eventHandlerMapping; 
     private IEventStoreConnection _connection; 

     public Dictionary<Type, Action<object>> EventHandlerMapping 
     { 
      get { return _eventHandlerMapping; } 
     } 

     public SomeEventSubscriber() 
     { 
      _eventHandlerMapping = CreateEventHandlerMapping(); 
      _latestPosition = Position.Start; 
     } 

     public void Start() 
     { 
      ConnectToEventstore(); 
     } 

     private void ConnectToEventstore() 
     { 
      _connection = EventStoreConnectionWrapper.Connect(); 
      _connection.Connected += 
      (sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped); 
     } 

     private Dictionary<Type, Action<object>> CreateEventHandlerMapping() 
     { 
      return new Dictionary<Type, Action<object>> 
      { 
       {typeof (FakeEvent1), o => Handle(o as FakeEvent1)}, 
       {typeof (FakeEvent2), o => Handle(o as FakeEvent2)}, 
      }; 
     } 

     private async Task Handle(FakeEvent1 eventToHandle) 
     { 
      SomethingLongRunning(eventToHandle); 
     } 

     private async Task Handle(FakeEvent2 eventToHandle) 
     { 
      SomethingLongRunning(eventToHandle); 
     } 

     private async Task SomethingLongRunning(BaseFakeEvent eventToHandle) 
     { 
      Console.WriteLine("Start Handling: " + eventToHandle.GetType()); 
      var task = Task.Delay(10000); 
      await task; 
      Console.WriteLine("Finished Handling: " + eventToHandle.GetType()); 
     } 

     private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, 
      ResolvedEvent resolvedEvent) 
     { 
      if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$")) 
       return; 

      var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent); 
      if (@event != null) 
      { 
       var eventType = @event.GetType(); 
       if (_eventHandlerMapping.ContainsKey(eventType)) 
       { 
        var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](event)); 
        Console.WriteLine("The task is running asynchronously..."); 
       } 
      } 
      if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value; 
     } 

     private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex) 
     { 
      if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow) 
      { 
       //TODO: Wait and reconnect probably with back off 
      } 

      if (dropReason == SubscriptionDropReason.UserInitiated) 
       return; 

      if (SubscriptionDropMayBeRecoverable(dropReason)) 
      { 
       Start(); 
      } 
     } 

     private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason) 
     { 
      return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError || 
        dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed; 
     } 

     private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription) 
     { 

     } 
    } 
} 

अपने विशेषज्ञ राय में, यह एक वैध दृष्टिकोण है? क्या आप किसी भी सुधार का सुझाव दे सकते हैं?

पुनश्च:

शायद:

Task.Run(() => _eventHandlerMapping[eventType](@event)); 

बेहतर होगा?

+3

मैंने पूरी समीक्षा नहीं की, लेकिन मेरी पहली आलोचना "@event" नामक चर से बचने के लिए होगी। यहां तक ​​कि यह @ उपसर्ग के साथ पूरी तरह से कानूनी है, यह बदसूरत है। केवल डेटासेट (डीबी, डब्ल्यूएस ...) के खिलाफ जेनरेट कोड के लिए उदाहरण के लिए उपयोगी, "नियमित" कोड में नहीं। मुझे पता है, यह मुख्य विषय से संबंधित वास्तव में सहायक नहीं है, क्षमा करें। – AFract

+1

.NET 5 जैसी कोई चीज़ नहीं है। वर्तमान संस्करण .NET 4.5.2 है, और आगामी संस्करण (विजुअल स्टूडियो 2015) .NET 4.6 है। क्या आप .NET संस्करण संख्या और सी # संस्करणों को भ्रमित कर रहे हैं? –

+0

हां, यही मेरा मतलब था - क्षमा करें। – cs0815

उत्तर

1

आप एक ही EventOccured प्रतिनिधि है जो जहां सभी घटनाओं है कि में पाए जाते हैं के लिए सूचित किया चाहते हैं EventStore
सबसे पहले जिस पर घटनाओं निकाल दिया जाता है के अलावा किसी अन्य डिस्पैचर में EventOccured अंदर पूर्व कोड चलाने पर विचार।
दूसरा, FakeEventBase के कार्यान्वयन के साथ इसे abstract class पर बदलना संभव होगा और फिर इसे बढ़ाएं और प्रत्येक FakeEvent प्रकार के लिए व्यक्तिगत उदाहरण बनाएं। यह बहुत साफ समाधान होगा।
तीसरे, क्यूइंग के लिए कस्टम ThreadScheduler पर विचार करें और इन Handle कार्यों को चलाएं। http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx

संपादित करें:
मैं एक प्रसारक वर्ग जब आपरेशन पूरा कर लिया है जानता है और समाप्त घटना को जन्म देती है जो नीचे की तरह होगा।

public class EventBroadcaster 
{ 
    public event EventHandler SomeEventOccured; 

    public async void DoLongRunningOperationAndRaiseFinishedEvent() 
    { 
     var waitingTask = Task.Delay(TimeSpan.FromSeconds(2)); 

     await waitingTask.ContinueWith(t => RaiseSomeEventOccured(), CancellationToken.None, 
      TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current); 
    } 

    private void RaiseSomeEventOccured() 
    { 
     EventHandler handler = SomeEventOccured; 
     if (handler != null) handler(this, EventArgs.Empty); 
    } 
} 

और फिर एक EventListener

public class EventListner 
{ 
    private readonly string _id; 

    public EventListner(string id) 
    { 
     _id = id; 
    } 

    public void ListenTo(EventBroadcaster broadcaster) 
    { 
     broadcaster.SomeEventOccured += OnSomeEventOccured; 
    } 

    private async void OnSomeEventOccured(object sender, EventArgs eventArgs) 
    { 
     var currentTime = DateTime.Now; 
     Console.WriteLine("EventListner {0} received at {1}", _id, 
      currentTime.ToString("dd-MM-yyyy HH:mm:ss.fffffff")); 

     //Not required just to show this does not affect other instances. 
     //await Task.Delay(TimeSpan.FromSeconds(5)); 
    } 
} 

तो यह

public static class Program 
{ 
    public static void Main(string[] args) 
    { 
     var broadcaster = new EventBroadcaster(); 

     var listners = new List<EventListner>(); 

     for (int i = 1; i < 10; i++) 
     { 
      var listner = new EventListner(i.ToString(CultureInfo.InvariantCulture)); 
      listner.ListenTo(broadcaster); 
      listners.Add(listner); 
     } 

     broadcaster.DoLongRunningOperationAndRaiseFinishedEvent(); 

     Console.WriteLine("Waiting for operation to complete"); 

     Console.ReadLine(); 

    } 
} 

परीक्षण इस उदाहरण हैंडलर प्रतिनिधियों के क्रम में एक के बाद एक के सक्रिय होने में लिए Program.cs होगा वे सब्सक्राइब किए गए थे।

अब की तरह नीचे नोट कुछ करने के लिए प्रसारणकर्ता में कोड को संशोधित: मैं कोडिंग में आसानी के लिए Action को EventHandler से विधि हस्ताक्षर बदल दिया है।

private void RaiseSomeEventOccured() 
    { 
     Action handler = SomeEventOccured; 
     if (handler != null) 
     { 
      var parallelOption = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount }; 
      Parallel.Invoke(parallelOption, Array.ConvertAll(handler.GetInvocationList(), ConvertToAction)); 
      handler(); 
     } 
    } 

    private Action ConvertToAction(Delegate del) 
    { 
     return (Action)del; 
    } 

अब आप कि घटनाओं यादृच्छिक क्रम में निकाल दिया जाता है देखेंगे। TPL और Parallel प्रोग्रामिंग आप यकीन है कि यह के लिए जाने से पहले एक लाभ है बनाने की जरूरत है साथ हमेशा:
मैं विकल्प का उपयोग 1.
नोट बेहतर प्रदर्शन मिला है।

+2

क्षमा करें आपका उत्तर कोई मतलब नहीं। क्यों कुछ EventSubscriber FakeEventBase लागू करना चाहिए? यह भी सुनिश्चित नहीं है कि मुझे एक साधारण परिदृश्य में थ्रेडशेड्यूलर बनाने वाले दर्जी को लागू क्यों करना चाहिए ... – cs0815

+2

यदि आप एकाधिक श्रोताओं को समानांतर में संसाधित किया जा सकता है तो आप समानांतर में ईवेंट को संभालना चाहते हैं। एक ट्रैफिक सिग्नल छवि, 'हरे रंग की रोशनी' के संकेत में कई कारें प्रतीक्षा कर रही हैं और फिर सभी कारें उनके सामने यातायात के आधार पर आगे बढ़ने का निर्णय लेती हैं; आपके मामले में 'EventOccured' को एक ही समय में सभी हैंडलर पर प्रसारित नहीं किया जाता है, बल्कि' EventOccured' उन्हें एक-एक करके भेजता है हालांकि असीमित रूप से आशा है कि आपको अंतर मिलेगा। –

+1

इसके अलावा मैंने एक अलग 'थ्रेडपूल' होने का सुझाव दिया क्योंकि मैं नहीं चाहता था कि आप मुख्य थ्रेडपूल में भी प्रसंस्करण करना चाहते हैं जो अन्य थ्रेडपूल के कार्यों में कतारबद्ध होने पर देरी हो सकती है। –

1

मुझे वास्तव में समांतर घटना ग्राहक बनाने में कोई बिंदु नहीं दिखता है (यदि मैं आपके इरादे को सही समझता हूं - समानांतर में ईवेंट हैंडलर चलाने में सक्षम होने के लिए, सामान्य ईवेंट के साथ एक के बाद नहीं)।

यह बहुत अधिक स्पष्ट अगर ईवेंट हैंडलर ही यह पता चलता है समानांतर में चलाने के लिए आशय व्यक्त करने के लिए है।

कुछ (बहुत आदिम) की तरह कुछ।

void SomeEventHandler(object sender, EventArgs e) 
{ 
    Task.Run(() => 
    { 
     ... // some code to run in parallel 
    }); 
} 

आप प्रबंधक की तरह बना सकते हैं (ईमानदारी से, मैं कोई सुराग नहीं कैसे सभी कोर पर कब्जा करने के लिए है, लेकिन मुझे नहीं लगता इस जटिल है, मैं सिर्फ यह करने के लिए की आवश्यकता नहीं है), लेकिन कृपया, सामान्य घटनाओं को रखें।

+1

के बिना सब्सक्राइब किए गए क्रम में घटनाओं को फायरिंग कर रहा था, मुझे लगता है कि अगर ईवेंट विशेष घटनाक्रम के लिए बहुत जल्दी आते हैं तो आपके सुझाव कोर का फायदा उठाएंगे। यह एक उच्च स्तर पर कोर का शोषण करने के विपरीत है (यानी कुछ EventSubscriber) जैसा कि मैंने प्रयास किया है। प्रश्न यह है कि अधिक स्केलेबल क्या है ... – cs0815

संबंधित मुद्दे