पर प्रतीक्षा कर रहा है, मुझे एक बहु-थ्रेडेड सर्वर के साथ कोई समस्या है, जिसे मैं अकादमिक अभ्यास के रूप में बना रहा हूं, विशेष रूप से सुंदरता से बंद करने के लिए कनेक्शन प्राप्त करने के साथ।अवरुद्ध रूप से एक थ्रेड को समाप्त करना जो अवरुद्ध कतार
प्रत्येक कनेक्शन सत्र सत्र द्वारा प्रबंधित किया जाता है। यह वर्ग कनेक्शन के लिए 2 थ्रेड बनाए रखता है, डाउनस्ट्रीम थ्रेड और अपस्ट्रीम थ्रेड।
क्लाइंट सॉकेट पर अपस्ट्रीम थ्रेड ब्लॉक और सभी आने वाली तारों को संदेशों में एन्कोड करने के लिए किसी अन्य परत तक पहुंचने के लिए पास किया जाता है। डाउनस्ट्रीम थ्रेड ब्लॉक ब्लॉकिंगक्यूयू पर जिसमें क्लाइंट के लिए संदेश डाले जाते हैं। कतार पर एक संदेश होने पर, डाउनस्ट्रीम थ्रेड कतार से संदेश लेता है, इसे एक स्ट्रिंग में बदल देता है और इसे क्लाइंट को भेजता है। अंतिम प्रणाली में, एक एप्लिकेशन लेयर आने वाले संदेशों पर कार्य करेगी और उचित क्लाइंट को भेजने के लिए सर्वर पर आउटगोइंग संदेशों को दबाएगी, लेकिन अभी के लिए मेरे पास एक साधारण एप्लिकेशन है जो इसे वापस करने से पहले आने वाले संदेश पर दूसरे के लिए सोता है एक टाइमस्टैम्प के साथ एक आउटगोइंग संदेश के रूप में जोड़ा गया।
क्लाइंट डिस्कनेक्ट होने पर मुझे जो समस्या हो रही है वह पूरी तरह से बंद करने के लिए पूरी चीज प्राप्त कर रही है। जिस प्रथम समस्या का मैं सामना कर रहा हूं वह एक सामान्य डिस्कनेक्ट है, जहां क्लाइंट सर्वर को यह बताता है कि यह QUIT कमांड के साथ कनेक्शन समाप्त कर रहा है। मूल छद्म कोड है:
while (!quitting) {
inputString = socket.readLine() // blocks
if (inputString != "QUIT") {
// forward the message upstream
server.acceptMessage (inputString);
} else {
// Do cleanup
quitting = true;
socket.close();
}
}
अपस्ट्रीम थ्रेड का मुख्य पाश इनपुट स्ट्रिंग को देखता है। यदि यह क्विट है तो धागा यह कहने के लिए एक ध्वज सेट करता है कि क्लाइंट ने संचार समाप्त कर दिया है और लूप से बाहर निकला है। इससे अपस्ट्रीम थ्रेड अच्छी तरह से बंद हो जाता है।
डाउनस्ट्रीम थ्रेड का मुख्य पाश ब्लॉकिंगक्यूयू में संदेशों के लिए इंतजार कर रहा है जब तक कि कनेक्शन बंद होने वाला ध्वज सेट न हो। जब यह होता है, तो डाउनस्ट्रीम थ्रेड को भी समाप्त करना होता है। हालांकि, ऐसा नहीं है, यह बस इंतज़ार कर बैठता है। इसके psuedocode इस तरह दिखता है:
while (!quitting) {
outputMessage = messageQueue.take(); // blocks
sendMessageToClient (outputMessage);
}
जब मैं इस परीक्षण किया है, मैंने देखा है कि जब ग्राहक छोड़ दिया, नदी के ऊपर धागा शट डाउन है, लेकिन नीचे की ओर धागा नहीं किया।
कुछ सिर खरोंच के बाद, मुझे एहसास हुआ कि डाउनस्ट्रीम थ्रेड अभी भी अवरोधक्यूयू पर अवरुद्ध कर रहा है जो आने वाले संदेश की प्रतीक्षा कर रहा है जो कभी नहीं आएगा। अपस्ट्रीम थ्रेड क्विट संदेश को श्रृंखला के आगे आगे नहीं बढ़ाता है।
मैं डाउनस्ट्रीम थ्रेड को कैसे बंद कर सकता हूं? दिमाग में उभरने वाला पहला विचार ले() कॉल पर टाइमआउट सेट कर रहा था। हालांकि मैं इस विचार पर बहुत उत्सुक नहीं हूं, क्योंकि जो भी मूल्य आप चुनते हैं, वह पूरी तरह संतोषजनक नहीं है। या तो यह बहुत लंबा है और एक ज़ोंबी धागा बंद होने से पहले लंबे समय तक वहां बैठता है, या यह बहुत छोटा है और कनेक्शन जो कुछ मिनटों के लिए निष्क्रिय हैं लेकिन अभी भी मान्य हैं। मैंने श्रृंखला को QUIT संदेश भेजने के बारे में सोचा था, लेकिन इसके लिए सर्वर पर पूर्ण राउंड ट्रिप करने की आवश्यकता है, फिर एप्लिकेशन, फिर सर्वर पर फिर से और अंत में सत्र में वापस आना चाहिए। यह या तो एक सुरुचिपूर्ण समाधान की तरह प्रतीत नहीं होता है।
मैंने थ्रेड.स्टॉप() के लिए प्रलेखन को देखा लेकिन यह स्पष्ट रूप से बहिष्कृत है क्योंकि यह किसी भी तरह से ठीक से काम नहीं करता है, ऐसा लगता है कि यह वास्तव में एक विकल्प नहीं है। एक और विचार जो मैंने डाउनस्ट्रीम थ्रेड में किसी भी तरह से ट्रिगर करने के लिए अपवाद को मजबूर किया था और इसे अंततः ब्लॉक में साफ करने दिया था, लेकिन यह मुझे एक भयानक और कट्टर विचार के रूप में मारता है।
मुझे लगता है कि दोनों थ्रेड अपने आप को बंद करने में सक्षम होना चाहिए, लेकिन मुझे यह भी संदेह है कि अगर एक धागा समाप्त होता है तो इसे अन्य धागे को दूसरे के लिए ध्वज सेट करने की अपेक्षा अधिक सक्रिय तरीके से समाप्त करना होगा जांच करने के लिए धागा।चूंकि मैं जावा के साथ अभी भी बहुत अनुभवी नहीं हूं, इसलिए मैं इस बिंदु पर विचारों से बाहर हूं। अगर किसी के पास कोई सलाह है, तो इसकी सराहना की जाएगी।
पूर्णता के लिए, मैंने नीचे सत्र वर्ग के लिए असली कोड शामिल किया है, हालांकि मुझे लगता है कि ऊपर के छद्म कोड स्निपेट समस्या के प्रासंगिक हिस्सों को कवर करते हैं। पूर्ण वर्ग लगभग 250 लाइनें है।
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* Session class
*
* A session manages the individual connection between a client and the server.
* It accepts input from the client and sends output to the client over the
* provided socket.
*
*/
public class Session {
private Socket clientSocket = null;
private Server server = null;
private Integer sessionId = 0;
private DownstreamThread downstream = null;
private UpstreamThread upstream = null;
private boolean sessionEnding = false;
/**
* This thread handles waiting for messages from the server and sending
* them to the client
*/
private class DownstreamThread implements Runnable {
private BlockingQueue<DownstreamMessage> incomingMessages = null;
private OutputStreamWriter streamWriter = null;
private Session outer = null;
@Override
public void run() {
DownstreamMessage message;
Thread.currentThread().setName ("DownstreamThread_" + outer.getId());
try {
// Send connect message
this.sendMessageToClient ("Hello, you are client " + outer.getId());
while (!outer.sessionEnding) {
message = this.incomingMessages.take();
this.sendMessageToClient (message.getPayload());
}
// Send disconnect message
this.sendMessageToClient ("Goodbye, client " + getId());
} catch (InterruptedException | IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
} finally {
this.terminate();
}
}
/**
* Add a message to the downstream queue
*
* @param message
* @return
* @throws InterruptedException
*/
public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
if (!outer.sessionEnding) {
this.incomingMessages.put (message);
}
return this;
}
/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
OutputStreamWriter osw;
// Output to client
if (null != (osw = this.getStreamWriter())) {
osw.write ((String) message);
osw.write ("\r\n");
osw.flush();
}
return this;
}
/**
* Perform session cleanup
*
* @return
*/
private DownstreamThread terminate() {
try {
this.streamWriter.close();
} catch (IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
}
this.streamWriter = null;
return this;
}
/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter() throws IOException {
if ((null == this.streamWriter)
&& (!outer.sessionEnding)) {
BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
/**
*
* @param outer
*/
public DownstreamThread (Session outer) {
this.outer = outer;
this.incomingMessages = new LinkedBlockingQueue();
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* This thread handles waiting for client input and sending it upstream
*/
private class UpstreamThread implements Runnable {
private Session outer = null;
@Override
public void run() {
StringBuffer inputBuffer = new StringBuffer();
BufferedReader inReader;
Thread.currentThread().setName ("UpstreamThread_" + outer.getId());
try {
inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8"));
while (!outer.sessionEnding) {
// Read whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length());
inputBuffer.append (inReader.readLine());
System.out.println ("Input message was: " + inputBuffer);
if (!inputBuffer.toString().equals ("QUIT")) {
// Forward the message up the chain to the Server
outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString()));
} else {
// End the session
outer.sessionEnding = true;
}
}
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
} finally {
outer.terminate();
outer.server.deleteSession (outer.getId());
}
}
/**
* Class constructor
*
* The Core Java volume 1 book said that a constructor such as this
* should be implicitly created, but that doesn't seem to be the case!
*
* @param outer
*/
public UpstreamThread (Session outer) {
this.outer = outer;
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* Start the session threads
*/
public void run() //throws InterruptedException
{
Thread upThread = new Thread (this.upstream);
Thread downThread = new Thread (this.downstream);
upThread.start();
downThread.start();
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
this.downstream.acceptMessage (message);
return this;
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (String message) throws InterruptedException {
return this.acceptMessage (new DownstreamMessage (this.getId(), message));
}
/**
* Terminate the client connection
*/
private void terminate() {
try {
this.clientSocket.close();
} catch (IOException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
}
}
/**
* Get this Session's ID
*
* @return The ID of this session
*/
public Integer getId() {
return this.sessionId;
}
/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {
this.server = owner;
this.clientSocket = clientSocket;
this.sessionId = sessionId;
this.upstream = new UpstreamThread (this);
this.downstream = new DownstreamThread (this);
System.out.println ("Class " + this.getClass() + " created");
System.out.println ("Session ID is " + this.sessionId);
}
}
मुझे लगता है कि के बारे में सोच किया था, लेकिन एक kludge का एक सा सामान्य शट डाउन के हिस्से के रूप में एक अपवाद फेंक करने के लिए कुछ के लिए मजबूर नहीं कर रहा है? – GordonM
नहीं, इस तरह यह काम करने के लिए डिज़ाइन किया गया है। अन्यथा, बाधित किया जा सकता है कि हर विधि को एक विशेष बाधित मूल्य वापस करना होगा। अधिक जानकारी के लिए http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html देखें। – Brigham
मुझे जावा के बारे में बहुत कुछ पता नहीं है, लेकिन जब आप 'Thread.interrupt' को कॉल करते हैं तो क्या होगा और हम 'टेक' विधि को अवरुद्ध करने में नहीं हैं? मुझे लगता है कि व्यवधान नहीं उठाया जाएगा और मुझे लगता है कि हमें लगता है कि 'थ्रेड.इंटर इंटरप्टेड' की जांच करनी है। – Zuljin