2016-01-31 4 views
5

में एफ # इवेंट का उपयोग करना मैं एफ # में एसिंक वर्कफ़्लोज़ और एजेंटों के साथ बहुत कुछ कर रहा हूं, जबकि मैं घटनाओं में थोड़ा गहराई से जा रहा था, मैंने देखा कि इवेंट < _>() प्रकार थ्रेड- सुरक्षित। यहां मैं एक घटना को बढ़ाने की आम समस्या के बारे में बात नहीं कर रहा हूं। मैं वास्तव में एक घटना से सदस्यता लेने और निकालने/निपटाने के बारे में बात कर रहा हूं। परीक्षण के लिए मैं इस छोटे से कार्यक्रमबहु-थ्रेडेड कोड

let event = Event<int>() 
let sub = event.Publish 

[<EntryPoint>] 
let main argv = 
    let subscribe sub x = async { 
     let mutable disposables = [] 
     for i=0 to x do 
      let dis = Observable.subscribe (fun x -> printf "%d" x) sub 
      disposables <- dis :: disposables 
     for dis in disposables do 
      dis.Dispose() 
    } 

    Async.RunSynchronously(async{ 
     let! x = Async.StartChild (subscribe sub 1000) 
     let! y = Async.StartChild (subscribe sub 1000) 
     do! x 
     do! y 
     event.Trigger 1 
     do! Async.Sleep 2000 
    }) 
    0 

प्रोग्राम सरल है लिखा है, मैं एक घटना और एक समारोह है कि यह करने के लिए घटनाओं की एक विशेष राशि सदस्यता लेता है, और उसके बाद हर हैंडलर निपटाने पैदा करते हैं। मैं Async.StartChild के साथ उन फ़ंक्शन के दो उदाहरणों को उत्पन्न करने के लिए एक और async गणना का उपयोग करता हूं। दोनों कार्यों के समाप्त होने के बाद मैं यह देखने के लिए ट्रिगर करता हूं कि कुछ हैंडलर अभी भी बाकी हैं या नहीं।

लेकिन जब event.Trigger(1) परिणाम कहा जाता है अभी भी कुछ संचालकों घटना के लिए reigstered हैं कि है। चूंकि कुछ "1" कंसोल पर मुद्रित किए जाएंगे। इसका सामान्य अर्थ यह है कि सब्सक्राइब करना और/या निपटान थ्रेड-सुरक्षित नहीं है।

और है कि मैं उम्मीद नहीं की थी क्या है। यदि सब्सक्राइबिंग और डिस्पोजिंग थ्रेड-सुरक्षित नहीं है तो सामान्य रूप से ईवेंट कैसे सुरक्षित रूप से उपयोग किए जा सकते हैं? निश्चित घटनाओं को थ्रेड के बाहर भी इस्तेमाल किया जा सकता है, और एक ट्रिगर समानांतर या विभिन्न धागे में किसी भी फ़ंक्शन को नहीं बढ़ाता है। लेकिन यह मेरे लिए सामान्य है कि घटनाओं का उपयोग Async, एजेंट आधारित कोड या सामान्य रूप से थ्रेड के साथ किया जाता है। बैकराउंडवर्कर थ्रेड की जानकारी इकट्ठा करने के लिए उन्हें अक्सर संचार के रूप में उपयोग किया जाता है। Async.AwaitEvent के साथ किसी ईवेंट की सदस्यता लेना संभव है। यदि सब्सक्राइबिंग और डिस्पोजिंग थ्रेड-सुरक्षित नहीं है, तो ऐसे माहौल में ईवेंट का उपयोग करना कैसे संभव है? और किस उद्देश्य में Async.AwaitEvent है? यह मानते हुए कि एक Async वर्कफ़्लो थ्रेड को बस Async.AwaitEvent का उपयोग करने की उम्मीद करता है मूल रूप से "डिज़ाइन द्वारा टूटा हुआ" है यदि किसी ईवेंट को सब्सक्राइब करना/डिस्पोजे करना डिफ़ॉल्ट रूप से थ्रेड-सुरक्षित नहीं है।

सामान्य प्रश्न मैं का सामना करना पड़ रहा हूँ है। क्या यह सही है कि सदस्यता और निपटान थ्रेड-सुरक्षित नहीं है? मेरे उदाहरण से ऐसा लगता है, लेकिन शायद मुझे कुछ महत्वपूर्ण जानकारी याद आई। मैं वर्तमान में अपने डिजाइन में इवेंट का बहुत उपयोग करता हूं, मेरे पास आमतौर पर मेलबॉक्स प्रोसेसर होते हैं और अधिसूचना के लिए ईवेंट का उपयोग करते हैं। तो सवाल यह है। यदि घटनाएं थ्रेड-सुरक्षित नहीं हैं, तो वर्तमान में मैं जिस डिज़ाइन का उपयोग कर रहा हूं वह थ्रेड-सुरक्षित नहीं है। तो इस स्थिति के लिए एक फिक्स क्या है? एक नया नया धागा-सुरक्षित घटना कार्यान्वयन बनाना? क्या वे पहले से ही कुछ कार्यान्वयन मौजूद हैं जो इस समस्या का सामना करते हैं? या अत्यधिक थ्रेडेड वातावरण में सुरक्षित रूप से ईवेंट का उपयोग करने के लिए अन्य विकल्प हैं?

उत्तर

3

एफवाईआई; Event<int> के लिए कार्यान्वयन here पाया जा सकता है।

दिलचस्प बिट हो रहा है:

member e.AddHandler(d) = 
    x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>) 
member e.RemoveHandler(d) = 
    x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>) 

एक घटना की सदस्यता ली जा ईवेंट हैंडलर सदस्यता ले में पारित कर दिया साथ वर्तमान ईवेंट हैंडलर को जोड़ती है। यह संयुक्त घटना हैंडलर वर्तमान को बदल देता है।

एक समेकन परिप्रेक्ष्य की समस्या यह है कि यहां हमारे पास एक समवर्ती ग्राहक है, जिसमें समवर्ती ग्राहक वर्तमान घटना हैंडलर का उपयोग कर सकते हैं और "अंतिम" जो हैंडलर जीत वापस लिखते हैं (आखिरी मुश्किल है इन दिनों concurrency में अवधारणा लेकिन एनवीएम)।

Interlocked.CompareAndExchange का उपयोग कर सीएएस लूप पेश करने के लिए यहां क्या किया जा सकता है लेकिन यह गैर-समवर्ती उपयोगकर्ताओं को प्रभावित करने वाले प्रदर्शन ओवरहेड को जोड़ता है। यह कुछ ऐसा है जो कोई पीआर बंद कर सकता है और देख सकता है कि यह एफ # समुदाय द्वारा अनुकूल रूप से देखा गया है या नहीं। इसके बारे में क्या करने के बारे में अपने दूसरे प्रश्न के

WRT मैं सिर्फ कह सकते हैं मैं क्या करेंगे।मैं FSharpEvent का संस्करण बनाने के विकल्प के लिए जाऊंगा जो संरक्षित सदस्यता/सदस्यता समाप्त करने का समर्थन करता है। यदि आपकी कंपनी FOSS नीति इसे अनुमति देती है तो शायद इसे FSharpEvent का आधार दें। यदि यह सफल हो जाता है तो यह भविष्य में पीआर को एफ # कोर लिबरी बना सकता है।

मुझे आपकी आवश्यकताओं को नहीं पता है, लेकिन यह भी संभव है कि यदि आपको जो चाहिए वह कोरआउट (यानी असिंक) है और थ्रेड नहीं है तो प्रोग्राम को केवल 1 धागा का उपयोग करने के लिए फिर से लिखना संभव है और इस प्रकार आप इससे प्रभावित नहीं होंगे यह दौड़ की स्थिति।

+0

आपकी अंतिम वाक्य के लिए। Async.Start के साथ शुरू किया गया प्रत्येक async थ्रेड-पूल पर चलता है, लेकिन यह भी प्रयोग करते हैं! या करो! एसिंक को अन्य थ्रेड पर स्विच कर सकते हैं, उदाहरण के लिए बस "Async.Sleep" का उपयोग थ्रेड-पूल पर स्विच या Async.StartChild का उपयोग कर करता है। शीर्ष पर मैं मेलबॉक्स प्रोसेसर से ईवेंट ट्रिगर कर रहा हूं और वे थ्रेड-पूल पर हमेशा कुछ थ्रेड चलाते हैं। तो मैं नहीं देखता कि सबकुछ एक धागे पर कैसे चलाना है। लेकिन मैं यह व्यवहार भी नहीं चाहूंगा। मैं इस तरह के व्यवहार की उम्मीद कर रहा हूं क्योंकि मैं सब कुछ एक ही धागे पर चलाने के लिए नहीं चाहता हूं। –

+0

ऐसा लगता है कि घटनाओं पर थ्रेड सुरक्षा की कमी F # कोर लाइब्रेरी के साथ एक महत्वपूर्ण समस्या है। क्या आप इसके लिए कोई मुद्दा खोलने पर विचार करेंगे? –

+0

आप अपनी खुद की कोरआउटिन रोल कर सकते हैं या सुनिश्चित कर सकते हैं कि प्रत्येक 'चलो!' 'करो!' "मुख्य-थ्रेड" पर वापस पोस्ट करें। लेकिन जैसा कि आपने कहा था कि आप उस व्यवहार को वैसे भी नहीं चाहते हैं, इससे कोई फर्क नहीं पड़ता। – FuleSnabel

2

सबसे पहले, FuleSnable के उत्तर के लिए धन्यवाद। उसने मुझे सही दिशा की ओर इशारा किया। उन्होंने प्रदान की गई जानकारी के आधार पर मैंने ConcurrentEvent को स्वयं लागू किया। इस प्रकार Interlocked.CompareExchange का उपयोग अपने हैंडलर को जोड़ने/हटाने के लिए करता है, इसलिए यह लॉक-फ्री है और उम्मीद है कि इसे करने का सबसे तेज़ तरीका है। मैंने एफ # कंपाइलर से Event प्रकार की प्रतिलिपि बनाकर कार्यान्वयन शुरू किया। (मैं टिप्पणी भी छोड़ देता हूं)। वर्तमान कार्यान्वयन इस तरह दिखता है।

type ConcurrentEvent<'T> = 
    val mutable multicast : Handler<'T> 
    new() = { multicast = null } 

    member x.Trigger(arg:'T) = 
     match x.multicast with 
     | null ->() 
     | d -> d.Invoke(null,arg) |> ignore 
    member x.Publish = 
     // Note, we implement each interface explicitly: this works around a bug in the CLR 
     // implementation on CompactFramework 3.7, used on Windows Phone 7 
     { new obj() with 
      member x.ToString() = "<published event>" 
      interface IEvent<'T> 
      interface IDelegateEvent<Handler<'T>> with 
      member e.AddHandler(d) = 
       let mutable exchanged = false 
       while exchanged = false do 
        System.Threading.Thread.MemoryBarrier() 
        let dels = x.multicast 
        let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T> 
        let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) 
        if obj.ReferenceEquals(dels,result) then 
         exchanged <- true 
      member e.RemoveHandler(d) = 
       let mutable exchanged = false 
       while exchanged = false do 
        System.Threading.Thread.MemoryBarrier() 
        let dels = x.multicast 
        let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T> 
        let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) 
        if obj.ReferenceEquals(dels,result) then 
         exchanged <- true 
      interface System.IObservable<'T> with 
      member e.Subscribe(observer) = 
       let h = new Handler<_>(fun sender args -> observer.OnNext(args)) 
       (e :?> IEvent<_,_>).AddHandler(h) 
       { new System.IDisposable with 
        member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } } 

डिजाइन पर कुछ नोट:

  • मैं एक पुनरावर्ती पाश के साथ शुरू कर दिया। लेकिन ऐसा करने और संकलित कोड को देखते हुए यह एक अज्ञात वर्ग बनाता है और AddHandler या RemoveHandler को कॉल करने से इसका एक ऑब्जेक्ट बनाया जाता है। जबकि लूप के प्रत्यक्ष कार्यान्वयन के साथ जब भी कोई नया हैंडलर जोड़ा/निकाला जाता है तो यह किसी ऑब्जेक्ट की तत्कालता से बचाता है।
  • मैंने जेनेरिक हैश समानता से बचने के लिए स्पष्ट रूप से obj.ReferenceEquals का उपयोग किया।

कम से कम मेरे परीक्षणों में एक हैंडलर को जोड़ना/निकालना अब थ्रेड-सुरक्षित लगता है। ConcurrentEvent को आवश्यकतानुसार Event प्रकार के साथ बदला जा सकता है। अगर किसी के पास शुद्धता या प्रदर्शन के लिए अतिरिक्त सुधार हैं, तो उन टिप्पणियों का स्वागत है। अन्यथा मैं इस जवाब को समाधान के रूप में चिह्नित करूंगा।


की बेंचमार्क अगर लोगों पर उत्सुक हैं कितना धीमी ConcurrentEvent/के साथ 10,000 हैंडलर सदस्यता समाप्ति Event

let stopWatch() = System.Diagnostics.Stopwatch.StartNew() 

let event = Event<int>() 
let sub = event.Publish 

let cevent = ConcurrentEvent<int>() 
let csub = cevent.Publish 

let subscribe sub x = async { 
    let mutable disposables = [] 
    for i=0 to x do 
     let dis = Observable.subscribe (fun x -> printf "%d" x) sub 
     disposables <- dis :: disposables 
    for dis in disposables do 
     dis.Dispose() 
} 

let sw = stopWatch() 
Async.RunSynchronously(async{ 
    // Amount of tries 
    let tries = 10000 

    // benchmarking Event subscribe/unsubscribing 
    let sw = stopWatch() 
    let! x = Async.StartChild (subscribe sub tries) 
    let! y = Async.StartChild (subscribe sub tries) 
    do! x 
    do! y 
    sw.Stop() 
    printfn "Event: %O" sw.Elapsed 
    do! Async.Sleep 1000 
    event.Trigger 1 
    do! Async.Sleep 2000 

    // benchmarking ConcurrentEvent subscribe/unsubscribing 
    let sw = stopWatch() 
    let! x = Async.StartChild (subscribe csub tries) 
    let! y = Async.StartChild (subscribe csub tries) 
    do! x 
    do! y 
    sw.Stop() 
    printfn "\nConcurrentEvent: %O" sw.Elapsed 
    do! Async.Sleep 1000 
    cevent.Trigger 1 
    do! Async.Sleep 2000 
}) 

अपने सिस्टम सदस्यता लेने पर की तुलना में किया जाएगा गैर धागा सुरक्षित Event आसपास 1.4 seconds लेता है पूरा करना।

थ्रेड-सुरक्षित ConcurrentEvent को पूरा करने के लिए लगभग 1.8 seconds लेता है। तो मुझे लगता है कि ओवरहेड बहुत कम है।

+0

कुछ विचार करने के लिए: मुझे यकीन नहीं है कि 'तुलना एक्सचेंज' मेमोरी बाधा (या यह किस तरह का है) डाला गया है या आपको मैन्युअल मेमोरी बाधा करना है। – FuleSnabel

+0

लगता है 'तुलना एक्सचेंज' का मतलब पूर्ण बाधा है (http: // stackoverflow।com/प्रश्न/1581718/करता है-इंटरलॉक-compareexchange उपयोग एक स्मृति-बाधा)। AFAIK x86 पढ़ता है 'अधिग्रहण' बाधा का उपयोग करता है ताकि आपको यह संकेत देना चाहिए कि आप x86 पर अच्छे हैं लेकिन यदि आप चाहते हैं कि कोड एआरएम/पावरपीसी को लक्षित करे, तो आपको 'x.multicast' – FuleSnabel

+0

@FuleSnabel पढ़ने पर' इंटरलॉक 'कॉल डालने की आवश्यकता हो सकती है एक त्रुटि, क्योंकि मैंने 'x.multicast" का उपयोग 'प्रतिनिधिमंडल' और 'प्रतिनिधि' हटा दिया 'में किया था। संचालन को 'डील्स' चर पर बेहतर काम करना चाहिए। जहां तक ​​मुझे लगता है, तो मुझे 'x.multicast' की ताजगी सुनिश्चित करने के लिए 'x.multicast' से पढ़ने से पहले केवल एक मेमोरीबैरियर की आवश्यकता है। –