2013-05-01 7 views
5

पर प्रतीक्षा कर रहा है, मुझे एक बहु-थ्रेडेड सर्वर के साथ कोई समस्या है, जिसे मैं अकादमिक अभ्यास के रूप में बना रहा हूं, विशेष रूप से सुंदरता से बंद करने के लिए कनेक्शन प्राप्त करने के साथ।अवरुद्ध रूप से एक थ्रेड को समाप्त करना जो अवरुद्ध कतार

प्रत्येक कनेक्शन सत्र सत्र द्वारा प्रबंधित किया जाता है। यह वर्ग कनेक्शन के लिए 2 थ्रेड बनाए रखता है, डाउनस्ट्रीम थ्रेड और अपस्ट्रीम थ्रेड।

क्लाइंट सॉकेट पर अपस्ट्रीम थ्रेड ब्लॉक और सभी आने वाली तारों को संदेशों में एन्कोड करने के लिए किसी अन्य परत तक पहुंचने के लिए पास किया जाता है। डाउनस्ट्रीम थ्रेड ब्लॉक ब्लॉकिंगक्यूयू पर जिसमें क्लाइंट के लिए संदेश डाले जाते हैं। कतार पर एक संदेश होने पर, डाउनस्ट्रीम थ्रेड कतार से संदेश लेता है, इसे एक स्ट्रिंग में बदल देता है और इसे क्लाइंट को भेजता है। अंतिम प्रणाली में, एक एप्लिकेशन लेयर आने वाले संदेशों पर कार्य करेगी और उचित क्लाइंट को भेजने के लिए सर्वर पर आउटगोइंग संदेशों को दबाएगी, लेकिन अभी के लिए मेरे पास एक साधारण एप्लिकेशन है जो इसे वापस करने से पहले आने वाले संदेश पर दूसरे के लिए सोता है एक टाइमस्टैम्प के साथ एक आउटगोइंग संदेश के रूप में जोड़ा गया।

क्लाइंट डिस्कनेक्ट होने पर मुझे जो समस्या हो रही है वह पूरी तरह से बंद करने के लिए पूरी चीज प्राप्त कर रही है। जिस प्रथम समस्या का मैं सामना कर रहा हूं वह एक सामान्य डिस्कनेक्ट है, जहां क्लाइंट सर्वर को यह बताता है कि यह QUIT कमांड के साथ कनेक्शन समाप्त कर रहा है। मूल छद्म कोड है:

while (!quitting) { 
    inputString = socket.readLine() // blocks 
    if (inputString != "QUIT") { 
     // forward the message upstream 
     server.acceptMessage (inputString); 
    } else { 
     // Do cleanup 
     quitting = true; 
     socket.close(); 
    } 
} 

अपस्ट्रीम थ्रेड का मुख्य पाश इनपुट स्ट्रिंग को देखता है। यदि यह क्विट है तो धागा यह कहने के लिए एक ध्वज सेट करता है कि क्लाइंट ने संचार समाप्त कर दिया है और लूप से बाहर निकला है। इससे अपस्ट्रीम थ्रेड अच्छी तरह से बंद हो जाता है।

डाउनस्ट्रीम थ्रेड का मुख्य पाश ब्लॉकिंगक्यूयू में संदेशों के लिए इंतजार कर रहा है जब तक कि कनेक्शन बंद होने वाला ध्वज सेट न हो। जब यह होता है, तो डाउनस्ट्रीम थ्रेड को भी समाप्त करना होता है। हालांकि, ऐसा नहीं है, यह बस इंतज़ार कर बैठता है। इसके psuedocode इस तरह दिखता है:

while (!quitting) { 
    outputMessage = messageQueue.take(); // blocks 
    sendMessageToClient (outputMessage); 
} 

जब मैं इस परीक्षण किया है, मैंने देखा है कि जब ग्राहक छोड़ दिया, नदी के ऊपर धागा शट डाउन है, लेकिन नीचे की ओर धागा नहीं किया।

कुछ सिर खरोंच के बाद, मुझे एहसास हुआ कि डाउनस्ट्रीम थ्रेड अभी भी अवरोधक्यूयू पर अवरुद्ध कर रहा है जो आने वाले संदेश की प्रतीक्षा कर रहा है जो कभी नहीं आएगा। अपस्ट्रीम थ्रेड क्विट संदेश को श्रृंखला के आगे आगे नहीं बढ़ाता है।

मैं डाउनस्ट्रीम थ्रेड को कैसे बंद कर सकता हूं? दिमाग में उभरने वाला पहला विचार ले() कॉल पर टाइमआउट सेट कर रहा था। हालांकि मैं इस विचार पर बहुत उत्सुक नहीं हूं, क्योंकि जो भी मूल्य आप चुनते हैं, वह पूरी तरह संतोषजनक नहीं है। या तो यह बहुत लंबा है और एक ज़ोंबी धागा बंद होने से पहले लंबे समय तक वहां बैठता है, या यह बहुत छोटा है और कनेक्शन जो कुछ मिनटों के लिए निष्क्रिय हैं लेकिन अभी भी मान्य हैं। मैंने श्रृंखला को QUIT संदेश भेजने के बारे में सोचा था, लेकिन इसके लिए सर्वर पर पूर्ण राउंड ट्रिप करने की आवश्यकता है, फिर एप्लिकेशन, फिर सर्वर पर फिर से और अंत में सत्र में वापस आना चाहिए। यह या तो एक सुरुचिपूर्ण समाधान की तरह प्रतीत नहीं होता है।

मैंने थ्रेड.स्टॉप() के लिए प्रलेखन को देखा लेकिन यह स्पष्ट रूप से बहिष्कृत है क्योंकि यह किसी भी तरह से ठीक से काम नहीं करता है, ऐसा लगता है कि यह वास्तव में एक विकल्प नहीं है। एक और विचार जो मैंने डाउनस्ट्रीम थ्रेड में किसी भी तरह से ट्रिगर करने के लिए अपवाद को मजबूर किया था और इसे अंततः ब्लॉक में साफ करने दिया था, लेकिन यह मुझे एक भयानक और कट्टर विचार के रूप में मारता है।

मुझे लगता है कि दोनों थ्रेड अपने आप को बंद करने में सक्षम होना चाहिए, लेकिन मुझे यह भी संदेह है कि अगर एक धागा समाप्त होता है तो इसे अन्य धागे को दूसरे के लिए ध्वज सेट करने की अपेक्षा अधिक सक्रिय तरीके से समाप्त करना होगा जांच करने के लिए धागा।चूंकि मैं जावा के साथ अभी भी बहुत अनुभवी नहीं हूं, इसलिए मैं इस बिंदु पर विचारों से बाहर हूं। अगर किसी के पास कोई सलाह है, तो इसकी सराहना की जाएगी।

पूर्णता के लिए, मैंने नीचे सत्र वर्ग के लिए असली कोड शामिल किया है, हालांकि मुझे लगता है कि ऊपर के छद्म कोड स्निपेट समस्या के प्रासंगिक हिस्सों को कवर करते हैं। पूर्ण वर्ग लगभग 250 लाइनें है।

import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.*; 

/** 
* Session class 
* 
* A session manages the individual connection between a client and the server. 
* It accepts input from the client and sends output to the client over the 
* provided socket. 
* 
*/ 
public class Session { 
    private Socket    clientSocket = null; 
    private Server    server   = null; 
    private Integer    sessionId  = 0; 
    private DownstreamThread downstream  = null; 
    private UpstreamThread  upstream  = null; 
    private boolean    sessionEnding = false; 

    /** 
    * This thread handles waiting for messages from the server and sending 
    * them to the client 
    */ 
    private class DownstreamThread implements Runnable { 
     private BlockingQueue<DownstreamMessage> incomingMessages = null; 
     private OutputStreamWriter     streamWriter  = null; 
     private Session        outer    = null; 

     @Override 
     public void run() { 
      DownstreamMessage message; 
      Thread.currentThread().setName ("DownstreamThread_" + outer.getId()); 

      try { 
       // Send connect message 
       this.sendMessageToClient ("Hello, you are client " + outer.getId()); 

       while (!outer.sessionEnding) { 
        message = this.incomingMessages.take(); 
        this.sendMessageToClient (message.getPayload()); 
       } 

       // Send disconnect message 
       this.sendMessageToClient ("Goodbye, client " + getId()); 

      } catch (InterruptedException | IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } finally { 
       this.terminate(); 
      } 
     } 

     /** 
     * Add a message to the downstream queue 
     * 
     * @param message 
     * @return 
     * @throws InterruptedException 
     */ 
     public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException { 
      if (!outer.sessionEnding) { 
       this.incomingMessages.put (message); 
      } 

      return this; 
     } 

     /** 
     * Send the given message to the client 
     * 
     * @param message 
     * @throws IOException 
     */ 
     private DownstreamThread sendMessageToClient (CharSequence message) throws IOException { 
      OutputStreamWriter osw; 
      // Output to client 
      if (null != (osw = this.getStreamWriter())) { 
       osw.write ((String) message); 
       osw.write ("\r\n"); 
       osw.flush(); 
      } 

      return this; 
     } 

     /** 
     * Perform session cleanup 
     * 
     * @return 
     */ 
     private DownstreamThread terminate() { 
      try { 
       this.streamWriter.close(); 
      } catch (IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } 
      this.streamWriter = null; 

      return this; 
     } 

     /** 
     * Get an output stream writer, initialize it if it's not active 
     * 
     * @return A configured OutputStreamWriter object 
     * @throws IOException 
     */ 
     private OutputStreamWriter getStreamWriter() throws IOException { 
      if ((null == this.streamWriter) 
      && (!outer.sessionEnding)) { 
       BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream()); 
       this.streamWriter  = new OutputStreamWriter (os, "UTF8"); 
      } 

      return this.streamWriter; 
     } 

     /** 
     * 
     * @param outer 
     */ 
     public DownstreamThread (Session outer) { 
      this.outer    = outer; 
      this.incomingMessages = new LinkedBlockingQueue(); 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * This thread handles waiting for client input and sending it upstream 
    */ 
    private class UpstreamThread implements Runnable { 
     private Session outer = null; 

     @Override 
     public void run() { 
      StringBuffer inputBuffer = new StringBuffer(); 
      BufferedReader inReader; 

      Thread.currentThread().setName ("UpstreamThread_" + outer.getId()); 

      try { 
       inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8")); 

       while (!outer.sessionEnding) { 
        // Read whatever was in the input buffer 
        inputBuffer.delete (0, inputBuffer.length()); 
        inputBuffer.append (inReader.readLine()); 
        System.out.println ("Input message was: " + inputBuffer); 

        if (!inputBuffer.toString().equals ("QUIT")) { 
         // Forward the message up the chain to the Server 
         outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString())); 
        } else { 
         // End the session 
         outer.sessionEnding = true; 
        } 
       } 

      } catch (IOException | InterruptedException e) { 
       Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
      } finally { 
       outer.terminate(); 
       outer.server.deleteSession (outer.getId()); 
      } 
     } 

     /** 
     * Class constructor 
     * 
     * The Core Java volume 1 book said that a constructor such as this 
     * should be implicitly created, but that doesn't seem to be the case! 
     * 
     * @param outer 
     */ 
     public UpstreamThread (Session outer) { 
      this.outer = outer; 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * Start the session threads 
    */ 
    public void run() //throws InterruptedException 
    { 
     Thread upThread  = new Thread (this.upstream); 
     Thread downThread = new Thread (this.downstream); 

     upThread.start(); 
     downThread.start(); 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException { 
     this.downstream.acceptMessage (message); 
     return this; 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (String message) throws InterruptedException { 
     return this.acceptMessage (new DownstreamMessage (this.getId(), message)); 
    } 

    /** 
    * Terminate the client connection 
    */ 
    private void terminate() { 
     try { 
      this.clientSocket.close(); 
     } catch (IOException e) { 
      Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
     } 
    } 

    /** 
    * Get this Session's ID 
    * 
    * @return The ID of this session 
    */ 
    public Integer getId() { 
     return this.sessionId; 
    } 

    /** 
    * Session constructor 
    * 
    * @param owner The Server object that owns this session 
    * @param sessionId The unique ID this session will be given 
    * @throws IOException 
    */ 
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException { 

     this.server   = owner; 
     this.clientSocket = clientSocket; 
     this.sessionId  = sessionId; 
     this.upstream  = new UpstreamThread (this); 
     this.downstream  = new DownstreamThread (this); 

     System.out.println ("Class " + this.getClass() + " created"); 
     System.out.println ("Session ID is " + this.sessionId); 
    } 
} 

उत्तर

3
इसके बजाय Thread.stop उपयोग Thread.interrupt बुलाने की

। इससे take विधि InterruptedException फेंकने का तरीका बन जाएगी जिसका उपयोग आप यह जानकर कर सकते हैं कि आपको बंद करना चाहिए।

+1

मुझे लगता है कि के बारे में सोच किया था, लेकिन एक kludge का एक सा सामान्य शट डाउन के हिस्से के रूप में एक अपवाद फेंक करने के लिए कुछ के लिए मजबूर नहीं कर रहा है? – GordonM

+0

नहीं, इस तरह यह काम करने के लिए डिज़ाइन किया गया है। अन्यथा, बाधित किया जा सकता है कि हर विधि को एक विशेष बाधित मूल्य वापस करना होगा। अधिक जानकारी के लिए http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html देखें। – Brigham

+0

मुझे जावा के बारे में बहुत कुछ पता नहीं है, लेकिन जब आप 'Thread.interrupt' को कॉल करते हैं तो क्या होगा और हम 'टेक' विधि को अवरुद्ध करने में नहीं हैं? मुझे लगता है कि व्यवधान नहीं उठाया जाएगा और मुझे लगता है कि हमें लगता है कि 'थ्रेड.इंटर इंटरप्टेड' की जांच करनी है। – Zuljin

0

क्या आप "QUIT" प्रकट होने पर outer.sessionEnding को सही करने के बजाय "नकली" छोड़ने का संदेश बना सकते हैं। कतार में यह नकली संदेश डालने से डाउनस्ट्रीम थ्रेड जाग जाएगा और आप इसे समाप्त कर सकते हैं। उस स्थिति में आप इस सत्र को समाप्त करने वाले चर को भी समाप्त कर सकते हैं।

छद्म कोड में यह ऐसा दिखाई दे सकता:

while (true) { 
    outputMessage = messageQueue.take(); // blocks 
    if (QUIT == outputMessage) 
     break 
    sendMessageToClient (outputMessage); 
} 
संबंधित मुद्दे