2017-10-29 14 views
9

मैं एक वर्ग है, जिसमें मैं एक नक्शा liveSocketsByDatacenter एक भी पृष्ठभूमि धागे से हर 30 सेकंड updateLiveSockets() विधि के अंदर पॉप्युलेट कर रहा हूँ और फिर मैं एक विधि getNextSocket() जिसके द्वारा बुलाया जाएगा है है को संशोधित करता है एक लाइव सॉकेट उपलब्ध कराने के लिए एकाधिक पाठक धागे जो इस जानकारी को प्राप्त करने के लिए एक ही मानचित्र का उपयोग करते हैं।साथ-साथ ही एक मानचित्र को पढ़ने के दौरान एक भी पृष्ठभूमि धागा नियमित रूप से यह

public class SocketManager { 
    private static final Random random = new Random(); 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter = 
     new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>())); 
    private final ZContext ctx = new ZContext(); 

    // Lazy Loaded Singleton Pattern 
    private static class Holder { 
    private static final SocketManager instance = new SocketManager(); 
    } 

    public static SocketManager getInstance() { 
    return Holder.instance; 
    } 

    private SocketManager() { 
    connectToZMQSockets(); 
    scheduler.scheduleAtFixedRate(new Runnable() { 
     public void run() { 
     updateLiveSockets(); 
     } 
    }, 30, 30, TimeUnit.SECONDS); 
    } 

    // during startup, making a connection and populate once 
    private void connectToZMQSockets() { 
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 
    // The map in which I put all the live sockets 
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>(); 
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); 
     updatedLiveSocketsByDatacenter.put(entry.getKey(), 
      Collections.unmodifiableList(addedColoSockets)); 
    } 
    // Update the map content 
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter)); 
    } 

    private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) { 
    List<SocketHolder> socketList = new ArrayList<>(); 
    for (String address : addresses) { 
     try { 
     Socket client = ctx.createSocket(socketType); 
     // Set random identity to make tracing easier 
     String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); 
     client.setIdentity(identity.getBytes(ZMQ.CHARSET)); 
     client.setTCPKeepAlive(1); 
     client.setSendTimeOut(7); 
     client.setLinger(0); 
     client.connect(address); 

     SocketHolder zmq = new SocketHolder(client, ctx, address, true); 
     socketList.add(zmq); 
     } catch (Exception ex) { 
     // log error 
     } 
    } 
    return socketList; 
    } 

    // this method will be called by multiple threads to get the next live socket 
    // is there any concurrency or thread safety issue or race condition here? 
    public Optional<SocketHolder> getNextSocket() { 
    // For the sake of consistency make sure to use the same map instance 
    // in the whole implementation of my method by getting my entries 
    // from the local variable instead of the member variable 
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
     this.liveSocketsByDatacenter.get(); 
    Optional<SocketHolder> liveSocket = Optional.absent(); 
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters(); 
    for (Datacenters dc : dcs) { 
     liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); 
     if (liveSocket.isPresent()) { 
     break; 
     } 
    } 
    return liveSocket; 
    } 

    // is there any concurrency or thread safety issue or race condition here? 
    private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) { 
    if (!CollectionUtils.isEmpty(endpoints)) { 
     // The list of live sockets 
     List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size()); 
     for (SocketHolder obj : endpoints) { 
     if (obj.isLive()) { 
      liveOnly.add(obj); 
     } 
     } 
     if (!liveOnly.isEmpty()) { 
     // The list is not empty so we shuffle it an return the first element 
     Collections.shuffle(liveOnly); 
     return Optional.of(liveOnly.get(0)); 
     } 
    } 
    return Optional.absent(); 
    } 

    // Added the modifier synchronized to prevent concurrent modification 
    // it is needed because to build the new map we first need to get the 
    // old one so both must be done atomically to prevent concistency issues 
    private synchronized void updateLiveSockets() { 
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 

    // Initialize my new map with the current map content 
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
     new HashMap<>(this.liveSocketsByDatacenter.get()); 

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); 
     List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketHolder liveSocket : liveSockets) { // LINE A 
     Socket socket = liveSocket.getSocket(); 
     String endpoint = liveSocket.getEndpoint(); 
     Map<byte[], byte[]> holder = populateMap(); 
     Message message = new Message(holder, Partition.COMMAND); 

     boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); 
     boolean isLive = (status) ? true : false; 
     // is there any problem the way I am using `SocketHolder` class? 
     SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); 
     liveUpdatedSockets.add(zmq); 
     } 
     liveSocketsByDatacenter.put(entry.getKey(), 
      Collections.unmodifiableList(liveUpdatedSockets)); 
    } 
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); 
    } 
} 

के रूप में आप मेरी कक्षा में देख सकते हैं:

  • एक भी पृष्ठभूमि धागा जो हर 30 सेकंड चलाता है से, मैं updateLiveSockets() विधि में सभी को लाइव सॉकेट साथ liveSocketsByDatacenter नक्शा पॉप्युलेट।
  • और फिर एकाधिक धागे से, मैं getNextSocket() विधि को मुझे एक लाइव सॉकेट उपलब्ध कराने के लिए कॉल करता हूं जो आवश्यक जानकारी प्राप्त करने के लिए liveSocketsByDatacenter मानचित्र का उपयोग करता है।

मेरे पास कोई कोड बिना किसी समस्या के ठीक काम कर रहा है और यह देखना चाहता था कि यह लिखने के लिए कोई बेहतर या अधिक प्रभावी तरीका है या नहीं। मैं थ्रेड सुरक्षा मुद्दों या किसी भी दौड़ की स्थिति पर कोई राय प्राप्त करना चाहता था, लेकिन अब तक मैंने कोई भी नहीं देखा है लेकिन मैं गलत हो सकता हूं।

मैं अधिकतर updateLiveSockets() विधि और getLiveSocketX() विधि के बारे में चिंतित हूं। मैं liveSockets पर ListSocketHolder लाइन ए पर और फिर SocketHolder ऑब्जेक्ट बना रहा हूं और एक और नई सूची में जोड़ रहा हूं। क्या यह ठीक है?

नोट:SocketHolder एक अपरिवर्तनीय वर्ग है। और आप मेरे पास ZeroMQ सामानों को अनदेखा कर सकते हैं।

+3

यह मुझे लगता है कि 'liveSocketsByDatacenter' ** अपरिवर्तनीय है ** है। यह सब बहुत आसान बनाने का एक तरीका है। –

+2

प्लस आपका तर्क बहुत सारे स्थानों में बहुत गहराई से है। 'वैकल्पिक' का दुरुपयोग मेरी आँखें पानी बनाता है। मैं इसे पूरी तरह से छुटकारा पाउंगा - आप इसे कहीं भी सही तरीके से उपयोग नहीं करते हैं। संकेत: 'Optional.isPresent' को कॉल करना हमेशा एक बुरा विचार है। –

+0

मेरे पास कुछ विधियां हैं जिनमें कुछ भी गलत हो सकता है। सबसे पहले '30% सेकेंड सेकेंड में बैकग्राउंड थ्रेड द्वारा बुलाया गया' अपडेट लाइवेट्स 'है, दूसरा' रीडर थ्रेट 'नामक' getNextSocket 'विधि है जो आंतरिक रूप से 'getLiveSocket' विधि को कॉल करता है, इसलिए इन सभी तीन तरीकों को मेरा विश्वास है कि थ्रेड सुरक्षा मुद्दों के संदर्भ में सही होना चाहिए। क्या आपको लगता है कि वे सभी सही चीजें कर रहे हैं? मैं 'updateLiveSockets' विधि में अधिक डरता हूं। – john

उत्तर

8

आप निम्न सिंक्रनाइज़ेशन तकनीकों का उपयोग करते हैं।

  1. लाइव सॉकेट डेटा वाला नक्शा एक परमाणु संदर्भ के पीछे है, यह सुरक्षित रूप से मानचित्र को स्विच करने की अनुमति देता है।
  2. updateLiveSockets() विधि सिंक्रनाइज़ (इस पर निहित) है, यह नक्शा को दो धागे से एक साथ स्विच करने से रोक देगा।
  3. यदि getNextSocket() विधि के दौरान स्विच होता है तो आप मिश्रणों से बचने के लिए इसका उपयोग करते समय मानचित्र का स्थानीय संदर्भ बनाते हैं।

क्या यह धागा सुरक्षित है, जैसा कि अब है?

थ्रेड सुरक्षा हमेशा साझा करता है कि साझा परिवर्तनीय डेटा पर उचित सिंक्रनाइज़ेशन है या नहीं। इस मामले में साझा म्यूटेबल डेटा सॉकेटहोल्डर की सूची में डेटासेंटर का मानचित्र है।

तथ्य यह है कि नक्शा AtomicReference में है, और उपयोग के लिए स्थानीय प्रतिलिपि बनाना मानचित्र पर पर्याप्त सिंक्रनाइज़ेशन है। आपकी विधियां नक्शे का एक संस्करण लेती हैं और इसका उपयोग करती हैं, AtomicReference की प्रकृति के कारण स्विचिंग संस्करण थ्रेड सुरक्षित है। यह मानचित्र volatile मानचित्र के लिए सदस्य फ़ील्ड बनाने के साथ भी हासिल किया जा सकता है, जैसा कि आप करते हैं, संदर्भ को अपडेट करते हैं (आप उस पर कोई चेक-ए-एक्ट ऑपरेशंस नहीं करते हैं)।

scheduleAtFixedRate() के रूप में गारंटी देता है कि पारित कर दिया Runnable के साथ ही समय पर नहीं चल जाएगा, updateLiveSockets() पर synchronized की जरूरत नहीं है, तथापि, यह भी किसी भी असली नुकसान नहीं करता है।

तो हाँ, यह वर्ग थ्रेड सुरक्षित है, जैसा कि है।

हालांकि, यह पूरी तरह स्पष्ट नहीं है कि SocketHolder एक साथ कई धागे द्वारा उपयोग किया जा सकता है। जैसा कि है, यह वर्ग सिर्फ यादृच्छिक लाइव एक चुनकर SocketHolder एस के समवर्ती उपयोग को कम करने की कोशिश करता है (यद्यपि एक यादृच्छिक अनुक्रमणिका चुनने के लिए पूरे सरणी को घुमाने की आवश्यकता नहीं है)। यह वास्तव में समवर्ती उपयोग को रोकने के लिए कुछ भी नहीं करता है।

क्या इसे और अधिक कुशल बनाया जा सकता है?

मुझे विश्वास है कि यह कर सकता है।updateLiveSockets() विधि को देखते समय, ऐसा लगता है कि यह एक ही नक्शा बनाता है, सिवाय इसके कि SocketHolder के पास isLive ध्वज के लिए अलग-अलग मान हो सकते हैं। इससे मुझे निष्कर्ष निकाला जाता है कि, पूरे मानचित्र को बदलने के बजाय, मैं बस मानचित्र में प्रत्येक सूचियों को स्विच करना चाहता हूं। और किसी थ्रेड में एक मानचित्र में प्रविष्टियों को सुरक्षित तरीके से बदलने के लिए, मैं बस ConcurrentHashMap का उपयोग कर सकता हूं।

यदि मैं ConcurrentHashMap का उपयोग करता हूं, और मानचित्र को स्विच नहीं करता हूं, बल्कि, मानचित्र में मान, मैं AtomicReference से छुटकारा पा सकता हूं।

मैपिंग बदलने के लिए मैं बस नई सूची बना सकता हूं और इसे सीधे मानचित्र में डाल सकता हूं। यह अधिक कुशल है, क्योंकि मैं जल्द से जल्द डेटा प्रकाशित करता हूं, और मैं कम ऑब्जेक्ट्स बनाता हूं, जबकि मेरा सिंक्रनाइज़ेशन केवल तैयार किए गए घटकों पर बनाता है, जो पठनीयता को लाभ देता है।

यहाँ अपने निर्माण (, कुछ हिस्सों कि कम प्रासंगिक थे छोड़े गए संक्षिप्तता के लिए)

public class SocketManager { 
    private static final Random random = new Random(); 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap 
    private final ZContext ctx = new ZContext(); 

    // ... 

    private SocketManager() { 
     connectToZMQSockets(); 
     scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); 
    } 

    // during startup, making a connection and populate once 
    private void connectToZMQSockets() { 
     Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; 
     for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); 
     liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map 
     } 
    } 

    // ...  

    // this method will be called by multiple threads to get the next live socket 
    // is there any concurrency or thread safety issue or race condition here? 
    public Optional<SocketHolder> getNextSocket() { 
     for (Datacenters dc : Datacenters.getOrderedDatacenters()) { 
     Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder> 
     if (liveSocket.isPresent()) { 
      return liveSocket; 
     } 
     } 
     return Optional.absent(); 
    } 

    // is there any concurrency or thread safety issue or race condition here? 
    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { 
     if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
     // The list of live sockets 
     List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); 
     for (SocketHolder obj : listOfEndPoints) { 
      if (obj.isLive()) { 
      liveOnly.add(obj); 
      } 
     } 
     if (!liveOnly.isEmpty()) { 
      // The list is not empty so we shuffle it an return the first element 
      return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one 
     } 
     } 
     return Optional.absent(); 
    } 

    // no need to make this synchronized 
    private void updateLiveSockets() { 
     Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; 

     for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); 
     List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketHolder liveSocket : liveSockets) { // LINE A 
      Socket socket = liveSocket.getSocket(); 
      String endpoint = liveSocket.getEndpoint(); 
      Map<byte[], byte[]> holder = populateMap(); 
      Message message = new Message(holder, Partition.COMMAND); 

      boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); 
      boolean isLive = (status) ? true : false; 

      SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); 
      liveUpdatedSockets.add(zmq); 
     } 
     liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. 
     } 
    } 

} 
+0

आपके विस्तृत स्पष्टीकरण के लिए धन्यवाद। मैं एक भाग पर उलझन में हूं, क्या यह गारंटी देगा कि सभी पाठक धागे 'getLiveextSockets()' विधि में जैसे ही राज्य बदलते हैं, 'getNextSocket()' विधि से सॉकेट की एक ही स्थिति देखेंगे? – john

+0

हां, एक बार 'updateLiveSockets() 'ने मानचित्र में नई सूची डाली है, पाठक धागे से बाद में' get()' को नई सूची – bowmore

+0

दिखाई देगी ठीक है, इसे आज़माएं, इसे आज़माएं और देखें कि यह कैसे काम करता है। मुझे एक और [सवाल] मिला है (https://stackoverflow.com/questions/47107331/how-to-make-sure-that-i-am-not-sharing-same-socket-between-two-threads-at-a - समान) इस आधार पर केवल मैं और यह सुनिश्चित करने की कोशिश कर रहा हूं कि मैं दो धागे के बीच एक ही सॉकेट साझा नहीं करता हूं। मुझे समाधान मिला है लेकिन देखने की कोशिश कर रहे हैं कि यह यहां काम करेगा या कोई बेहतर तरीका भी है? शायद हम थ्रेडलोकल का उपयोग कर सकते हैं? लेकिन मुझे अभी तक यकीन नहीं है। – john

4

यदि SocketHolder और Datacenters, अपरिवर्तनीय हैं, तो आपके प्रोग्राम ठीक दिखते हैं। यद्यपि यहां कुछ मामूली प्रतिक्रिया दी गई है।

1. AtomicReference का उपयोग

AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter

इस सदस्य चर एक AtomicReference में लिपटे होने की जरूरत नहीं है। आप इसके साथ कोई परमाणु सीएएस ऑपरेशन नहीं कर रहे हैं। आप बस volative Map<Datacenters, List<SocketHolder>> घोषित कर सकते हैं, और इसे पढ़ते समय, बस इसके लिए एक स्थानीय संदर्भ बनाएं। यह नए मानचित्र के संदर्भ के परमाणु स्वैप की गारंटी के लिए पर्याप्त है।

2. सिंक्रनाइज़ विधि

private synchronized void updateLiveSockets()

इस विधि किसी एकल थ्रेड निष्पादक से कहा जाता है, तो कोई ज़रूरत नहीं यह सिंक्रनाइज़ किए जाने के लिए नहीं है।

3।कुछ सरलीकरण

  • इस वर्ग के अपने वर्तमान प्रयोग से, ऐसा लगता है कि तुम बाहर सॉकेट जो updateLiveSockets में जिंदा नहीं हैं फ़िल्टर कर सकते हैं लगता है, एक ग्राहक कॉल getNextSocket

  • आप बदल सकते हैं हर बार फिल्टर करने के लिए से बचने Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS Set<Datacenters> datacenters = Utils.SERVERS.keySet() द्वारा और चाबियों के साथ काम करें।

    4. जावा 8

यदि संभव हो, Java8 के वैकल्पिक के साथ एक साथ जावा 8. स्ट्रीम करने के लिए स्विच बॉयलरप्लेट कोड का एक बहुत कुछ को हटा दें और अपने कोड बहुत आसान को पढ़ने के लिए होगा।

+0

मैं आपके उत्तर के माध्यम से फिर से जा रहा था और मैं जानना चाहता था कि आप '3.1'' में जो उल्लेख करते हैं, उसे मैं कैसे प्राप्त कर सकता हूं। अगर मैं उन सॉकेट को फ़िल्टर करता हूं जो जीवित नहीं हैं तो मैं अपने 'updateLiveSockets' विधि में फिर से सभी सॉकेट कैसे पुन: सक्रिय करूं? जैसा कि आप मेरी 'updateLiveSockets' विधि में देख सकते हैं, मैं सभी सॉकेट को पुन: सक्रिय कर रहा हूं और उन्हें समझने के लिए पिंग कर रहा हूं कि वे लाइव हैं या नहीं? – john

संबंधित मुद्दे