2013-07-18 12 views
6

मैं एक हैकेल नेटवर्क एप्लिकेशन पर काम कर रहा हूं और मैं मल्टीथ्रेडिंग को प्रबंधित करने के लिए अभिनेता पैटर्न का उपयोग करता हूं। एक चीज जो मैंने पार की थी, उदाहरण के लिए क्लाइंट सॉकेट/हैंडल का एक सेट कैसे स्टोर करना है। निश्चित रूप से सभी धागे के लिए सुलभ होना चाहिए और जब ग्राहक लॉग ऑन/ऑफ करते हैं तो बदल सकते हैं।हास्केल - अभिनेता आधारित उत्परिवर्तन

जब से मैं जरूरी दुनिया से आ रहा हूँ मैं ताला तंत्र के कुछ प्रकार के बारे में सोचा, लेकिन जब मैंने देखा कि यह कैसे बदसूरत मैं के बारे में "शुद्ध" अस्थिरता सोचा है, अच्छी तरह से वास्तव में यह एक तरह से शुद्ध है:

import Control.Concurrent 
import Control.Monad 
import Network 
import System.IO 
import Data.List 
import Data.Maybe 
import System.Environment 
import Control.Exception 


newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a)) 
newStorage = do 
    q <- newChan 
    forkIO $ storage [] q 
    return q 


newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle)) 
newHandleStorage = newStorage 


storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO() 
storage s q = do 
    let loop = (`storage` q) 
    (req, reply, d) <- readChan q 
    print ("processing " ++ show(d)) 
    case req of 
    "add" -> loop ((fromJust d) : s) 
    "remove" -> loop (delete (fromJust d) s) 
    "get" -> do 
     writeChan (fromJust reply) s 
     loop s 


store s d = writeChan s ("add", Nothing, Just d) 
unstore s d = writeChan s ("remove", Nothing, Just d) 
request s = do 
    chan <- newChan 
    writeChan s ("get", Just chan, Nothing) 
    readChan chan 

बिंदु यह है कि एक धागा (अभिनेता) वस्तुओं की एक सूची का प्रबंधन कर रहा है और आने वाले अनुरोधों के अनुसार सूची को संशोधित करता है। चूंकि धागा वास्तव में सस्ता है, मैंने सोचा कि यह वास्तव में एक अच्छा कार्यात्मक विकल्प हो सकता है।

बेशक यह सिर्फ एक प्रोटोटाइप (अवधारणा का त्वरित गंदा सबूत) है। तो मेरे सवाल है:

  1. इस (अभिनेता दुनिया में) साझा परिवर्तनशील चर के प्रबंधन के लिए एक "अच्छा" रास्ता नहीं है?
  2. क्या इस पैटर्न के लिए पहले से ही एक पुस्तकालय है? (मैं पहले से ही खोज की, लेकिन मैं कुछ भी नहीं मिला)

सादर, क्रिस

+3

यदि आप अभिनेता मॉडल के विकल्पों का पता लगाने के इच्छुक हैं, तो मैं आपको हास्केल की [सॉफ्टवेयर लेनदेन संबंधी मेमोरी] (https://en.wikipedia.org/wiki/Software_transactional_memory) का प्रयास करने का सुझाव दूंगा। यह डेटाबेस लेनदेन के समान एक सुंदर तंत्र है। रियल वर्ल्ड हास्केल में [अध्याय 28] (http://book.realworldhaskell.org/read/software-transactional-memory.html) देखें। –

+0

तकनीकी रूप से एक महान विकल्प है लेकिन मैंने सुना है कि बड़ी संख्या में थ्रेड (प्रति क्लाइंट एक थ्रेड जो हैकेल में मानक है) के साथ एसटीएम का उपयोग करना और अपेक्षाकृत लंबे परिचालन (सूची से किसी आइटम को हटाना ओ (एन) है, निश्चित रूप से हैश सेट/मानचित्र यहां मदद कर सकते हैं) बड़ी संख्या में एसटीएम के प्रदर्शन को कम कर सकते हैं। और निश्चित रूप से एमवीआर चैनल को एसटीएम चैनल द्वारा प्रतिस्थापित किया जा सकता है जिसका अर्थ है कि दो तकनीकों का सर्वोत्तम उपयोग करना। संपादित करें: इस तरह की स्थिति में अभिनेता पैटर्न आम तौर पर वास्तव में अच्छा होता है, क्योंकि किसी आइटम को हटाने/जोड़ने के लिए ओ (1) (केवल एक संदेश भेजना) वास्तविक काम धागे में किया जाता है ... – Kr0e

+0

आप सही हैं। एसटीएम के साथ ऐसा हो सकता है कि कई बार लेन-देन फिर से शुरू हो जाते हैं, जिससे कम प्रदर्शन होता है। लेकिन यदि आपके सिंक्रनाइज़ किए गए ऑपरेशन लंबे समय तक लेते हैं, तो आप अभिनेताओं के साथ भी इसी तरह की समस्याएं प्राप्त कर सकते हैं - अगर इससे अधिक संदेश हैं, तो यह स्थिति वास्तविकता के पीछे रह जाएगी। तो संतुलित पेड़ ('मानचित्र'/'सेट') या' एसटी/आईओ 'आधारित हैश सेट का उपयोग निश्चित रूप से मदद करेगा। –

उत्तर

6

यहाँ stm और pipes-network का उपयोग कर एक त्वरित और गंदा उदाहरण है। यह एक साधारण सर्वर स्थापित करेगा जो ग्राहकों को कनेक्ट करने और बढ़ने या काउंटर को कम करने की अनुमति देता है। यह एक बहुत ही सरल स्टेटस बार प्रदर्शित करेगा जो सभी कनेक्टेड क्लाइंट की वर्तमान लम्बाई दिखाता है और डिस्कनेक्ट होने पर क्लाइंट की लम्बाई को बार से हटा देगा।

सबसे पहले मैं सर्वर के साथ शुरू हो जाएगा, और मैं उदारता से कोड टिप्पणी की है यह कैसे काम करता व्याख्या करने के लिए:

import Control.Concurrent.STM (STM, atomically) 
import Control.Concurrent.STM.TVar 
import qualified Data.HashMap.Strict as H 
import Data.Foldable (forM_) 

import Control.Concurrent (forkIO, threadDelay) 
import Control.Monad (unless) 
import Control.Monad.Trans.State.Strict 
import qualified Data.ByteString.Char8 as B 
import Control.Proxy 
import Control.Proxy.TCP 
import System.IO 

main = do 
    hSetBuffering stdout NoBuffering 

    {- These are the internal data structures. They should be an implementation 
     detail and you should never expose these references to the 
     "business logic" part of the application. -} 
    -- I use nRef to keep track of creating fresh Ints (which identify users) 
    nRef <- newTVarIO 0  :: IO (TVar Int) 
    {- hMap associates every user (i.e. Int) with a counter 

     Notice how I've "striped" the hash map by storing STM references to the 
     values instead of storing the values directly. This means that I only 
     actually write the hashmap when adding or removing users, which reduces 
     contention for the hash map. 

     Since each user gets their own unique STM reference for their counter, 
     modifying counters does not cause contention with other counters or 
     contention with the hash map. -} 
    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int))) 

    {- The following code makes heavy use of Haskell's pure closures. Each 
     'let' binding closes over its current environment, which is safe since 
     Haskell is pure. -} 

    let {- 'getCounters' is the only server-facing command in our STM API. The 
      only permitted operation is retrieving the current set of user 
      counters. 

      'getCounters' closes over the 'hMap' reference currently in scope so 
      that the server never needs to be aware about our internal 
      implementation. -} 
     getCounters :: STM [Int] 
     getCounters = do 
      refs <- fmap H.elems (readTVar hMap) 
      mapM readTVar refs 

     {- 'init' is the only client-facing command in our STM API. It 
      initializes the client's entry in the hash map and returns two 
      commands: the first command is what the client calls to 'increment' 
      their counter and the second command is what the client calls to log 
      off and delete 
      'delete' command. 

      Notice that those two returned commands each close over the client's 
      unique STM reference so the client never needs to be aware of how 
      exactly 'init' is implemented under the hood. -} 
     init :: STM (STM(), STM()) 
     init = do 
      n <- readTVar nRef 
      writeTVar nRef $! n + 1 

      ref <- newTVar 0 
      modifyTVar' hMap (H.insert n ref) 

      let incrementRef :: STM() 
       incrementRef = do 
        mRef <- fmap (H.lookup n) (readTVar hMap) 
        forM_ mRef $ \ref -> modifyTVar' ref (+ 1) 

       deleteRef :: STM() 
       deleteRef = modifyTVar' hMap (H.delete n) 

      return (incrementRef, deleteRef) 

    {- Now for the actual program logic. Everything past this point only uses 
     the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I 
     could factor the above approved STM API into a separate module to enforce 
     the encapsulation boundary, but I am lazy. -} 

    {- Fork a thread which polls the current state of the counters and displays 
     it to the console. There is a way to implement this without polling but 
     this gets the job done for now. 

     Most of what it is doing is just some simple tricks to reuse the same 
     console line instead of outputting a stream of lines. Otherwise it 
     would be just: 

     forkIO $ forever $ do 
      ns <- atomically getCounters 
      print ns 
    -} 
    forkIO $ (`evalStateT` 0) $ forever $ do 
     del <- get 
     lift $ do 
      putStr (replicate del '\b') 
      putStr (replicate del ' ') 
      putStr (replicate del '\b') 
     ns <- lift $ atomically getCounters 
     let str = show ns 
     lift $ putStr str 
     put $! length str 
     lift $ threadDelay 10000 

    {- Fork a thread for each incoming connection, which listens to the client's 
     commands and translates them into 'STM' actions -} 
    serve HostAny "8080" $ \(socket, _) -> do 
     (increment, delete) <- atomically init 

     {- Right now, just do the dumb thing and convert all keypresses into 
      increment commands, with the exception of the 'q' key, which will 
      quit -} 
     let handler :: (Proxy p) =>() -> Consumer p Char IO() 
      handler() = runIdentityP loop 
       where 
       loop = do 
        c <- request() 
        unless (c == 'q') $ do 
         lift $ atomically increment 
         loop 

     {- This uses my 'pipes' library. It basically is a high-level way to 
      say: 

      * Read binary packets from the socket no bigger than 4096 bytes 

      * Get the first character from each packet and discard the rest 

      * Handle the character using the above 'handler' function -} 
     runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler 

     {- The above pipeline finishes either when the socket closes or 
      'handler' stops looping because it received a 'q'. Either case means 
      that the client is done so we log them out using 'delete'. -} 
     atomically delete 

आगे ग्राहक, जो केवल एक कनेक्शन और आगे के रूप में सभी कुंजी दबाव खोलता है एकल पैकेट:

import Control.Monad 
import Control.Proxy 
import Control.Proxy.Safe 
import Control.Proxy.TCP.Safe 
import Data.ByteString.Char8 (pack) 
import System.IO 

main = do 
    hSetBuffering stdin NoBuffering 
    hSetEcho  stdin False 

    {- Again, this uses my 'pipes' library. It basically says: 

     * Read characters from the console using 'commands' 

     * Pack them into a binary format 

     * send them to a server running at 127.0.0.1:8080 

     This finishes looping when the user types a 'q' or the connection is 
     closed for whatever reason. 
    -} 
    runSafeIO $ runProxy $ runEitherK $ 
     try . commands 
    >-> mapD (\c -> pack [c]) 
    >-> connectWriteD Nothing "127.0.0.1" "8080" 

commands :: (Proxy p) =>() -> Producer p Char IO() 
commands() = runIdentityP loop 
    where 
    loop = do 
     c <- lift getChar 
     respond c 
     unless (c == 'q') loop 

यह बहुत सरल है: commandsChar रों है, जो तब ByteString रों में बदल जाती है और फिर सर्वर से पैकेट के रूप में भेजे जाते हैं की एक धारा उत्पन्न करता है।

आप सर्वर और कुछ ग्राहकों को चलाने के लिए और उन्हें एक ही प्रयास में प्रत्येक प्रकार है, तो अपने सर्वर को प्रदर्शित करेगा उत्पादन दिखा कितने कुंजी प्रत्येक ग्राहक द्वारा लिखा गया एक सूची:

[1,6,4] 

... और अगर कुछ ग्राहकों के डिस्कनेक्ट वे सूची से निकाल दिया जाएगा:

[1,4] 

ध्यान दें कि इन उदाहरणों में से pipes घटक आगामी pipes-4.0.0 रिलीज में बहुत सरल है, लेकिन मौजूदा pipes पारिस्थितिकी तंत्र अभी भी जी नौकरी के रूप में किया जाता है।

+0

विस्मयकारी समाधान, मैं निश्चित रूप से इसके बारे में सोचूंगा;) – Kr0e

+0

बस मेरी समझ के लिए: क्या एसटीएम शुद्ध माना जाता है? मुझे लगता है कि यह लॉकिंग तंत्र का उपयोग किये बिना उत्परिवर्तन के बारे में है, है ना? – Kr0e

+2

@ Kr0e दाएं। एसटीएम के बारे में सोचें, संगत, थ्रेड-सुरक्षित म्यूटेबल मेमोरी संदर्भ। –

3

सबसे पहले, मैं निश्चित रूप से आदेशों का प्रतिनिधित्व करने के लिए अपने विशिष्ट डेटा प्रकार का उपयोग करने की अनुशंसा करता हूं। (String, Maybe (Chan [a]), Maybe a) का उपयोग करते समय एक बग्गी क्लाइंट अज्ञात कमांड भेजकर या ("add", Nothing, Nothing) आदि भेजकर अपने अभिनेता को क्रैश कर सकता है।मैं की तरह

data Command a = Add a | Remove a | Get (Chan [a]) 

कुछ सुझाव देंगे तो फिर तुम एक तरह से बचाने में storage में आदेश पर पैटर्न मैच कर सकते हैं।

अभिनेताओं के पास उनके फायदे हैं, लेकिन मुझे लगता है कि उन्हें कुछ कमियां हैं। उदाहरण के लिए, किसी अभिनेता से जवाब प्राप्त करने के लिए उसे एक आदेश भेजना होगा और फिर उत्तर देने का इंतजार करना होगा। और ग्राहक पूरी तरह से यह सुनिश्चित नहीं कर सकता कि उसे एक उत्तर मिलता है और उत्तर कुछ विशिष्ट प्रकार का होगा - आप यह नहीं कह सकते कि मुझे इस विशेष आदेश के लिए केवल इस प्रकार के जवाब (और उनमें से कितने) चाहिए।

तो एक उदाहरण के रूप में मैं एक सरल, एसटीएम समाधान प्रदान करूंगा। हैश टेबल या एक (संतुलित पेड़) सेट का उपयोग करना बेहतर होगा, लेकिन Handle न तो Ord लागू करता है और न ही Hashable, हम इन डेटा संरचनाओं का उपयोग नहीं कर सकते हैं, इसलिए मैं सूचियों का उपयोग जारी रखूंगा।

module ThreadSet (
    TSet, add, remove, get 
) where 

import Control.Monad 
import Control.Monad.STM 
import Control.Concurrent.STM.TVar 
import Data.List (delete) 

newtype TSet a = TSet (TVar [a]) 

add :: (Eq a) => a -> TSet a -> STM() 
add x (TSet v) = readTVar v >>= writeTVar v . (x :) 

remove :: (Eq a) => a -> TSet a -> STM() 
remove x (TSet v) = readTVar v >>= writeTVar v . delete x 

get :: (Eq a) => TSet a -> STM [a] 
get (TSet v) = readTVar v 

इस मॉड्यूल मनमाना तत्वों की एक STM आधारित सेट लागू करता है। आपके पास ऐसे कई सेट हो सकते हैं और उन्हें एक ही STM लेन-देन में एक साथ उपयोग किया जा सकता है जो एक बार में सफल होता है या विफल रहता है। उदाहरण

-- | Ensures that there is exactly one element `x` in the set. 
add1 :: (Eq a) => a -> TSet a -> STM() 
add1 x v = remove x v >> add x v 

के लिए यह अभिनेताओं के साथ मुश्किल होगा, आप अभिनेता के लिए एक और आदेश के रूप में जोड़ना होगा, आप इसे मौजूदा कार्यों की रचना नहीं कर सकते हैं और अभी भी atomicity है।

अद्यतन: एक दिलचस्प article बता रहा है कि क्यों क्लोजर डिजाइनरों ने अभिनेताओं का उपयोग नहीं करना चुना। उदाहरण के लिए, कलाकारों का उपयोग करना, भले ही आपके पास कई पढ़े गए हों और एक उत्परिवर्तनीय संरचना के लिए केवल बहुत ही कम लिखते हों, वे सभी क्रमबद्ध हैं, जो प्रदर्शन को बहुत प्रभावित कर सकते हैं।

+0

खैर, serializing/deserializing बहुत लागत है, यह सच है। क्लाउड हास्केल में "सीरियलाइजेशन-ओवरहेड" समान होता है, वे इसे एक सुविधा कहते हैं। लेकिन हाल ही में उन्होंने एक असुरक्षित प्रेषण समारोह जोड़ा जो ser./deser के बिना संदेश पास करता है। जो तेजी से परिमाण का क्रम है। सैद्धांतिक रूप से संदेश गुजरना एक साधारण फंक्शन कॉल के रूप में सस्ता होना चाहिए ताकि अभिनेता-पैटर्न को एक वास्तविक विकल्प बनाया जा सके, जो निश्चित रूप से मामला नहीं है लेकिन एरलांग में है। मुझे लगता है कि एसटीएम वास्तव में एक महान विशेषता है, शायद दोनों तकनीकों का उपयोग करने का तरीका है, क्योंकि अभिनेता-पैटर्न की तुलना में एसटीएम वास्तव में निम्न स्तर है। – Kr0e

संबंधित मुद्दे