2015-07-11 8 views
7

क्या यह कई धागे के बीच एक ही एपोल एफडी (सॉकेट एफडी नहीं) साझा करना सुरक्षित है? और यदि हां, तो क्या प्रत्येक थ्रेड को अपनी खुद की घटनाओं को epoll_wait(2) पर पास करना होगा या क्या वे इसे साझा कर सकते हैं?क्या थ्रेड के बीच एक ही एपोल फ़ाइल डिस्क्रिप्टर साझा करना ठीक है?

void *thread_func(void *socket_fd) { 
     // create events_fd 
     // allocate memory for events array 
     // subscribe to events EPOLLIN and EPOLLET 
     // epoll_wait using own epoll_fd and events_array 
     // now all threads would have a separate epoll_fd with 
     // events populated on its own array 
    } 

    void main(void) { 
    // create and bind to socket 

    //create multiple threads and pass thread_func and socket_fd to 
    // all threads 
    } 

वहाँ कैसे सी में यह करने के लिए का एक अच्छा उदाहरण है:

उदाहरण

void *thread_func(void *thread_args) { 
     // extract socket_fd, epoll_fd, &event, &events_array from 
     //  thread_args 
     // epoll_wait() using epoll_fd and events_array received from main 
     // now all threads would be using same epoll_fd and events array 
    } 

    void main(void) { 
     // create and bind to socket 
     // create events_fd 
     // allocate memory for events array 
     // subscribe to events EPOLLIN and EPOLLET 
     // pack the socket_fd, epoll_fd, &events, &events_array into 
     // thread_args struct. 

     // create multiple threads and pass thread_func and 
     // same thread_args to all threads 
    } 

या यह बेहतर इस तरह यह करने के लिए है के लिए

? मैंने देखा उदाहरण उदाहरण main() में इवेंट लूप चलाते हैं और जब भी कोई ईवेंट पता चला है तो अनुरोध को संसाधित करने के लिए एक नया धागा उत्पन्न करें। मैं क्या करना चाहता हूं प्रोग्राम की शुरुआत में धागे की एक विशिष्ट संख्या बनाएं और प्रत्येक थ्रेड इवेंट लूप और प्रसंस्करण अनुरोध चला रहा हो।

उत्तर

13

क्या यह धागे के बीच एक ही एपोल एफडी (सॉकेट एफडी नहीं) साझा करना सुरक्षित है।

हाँ, यह सुरक्षित है - epoll(7) इंटरफ़ेस धागा सुरक्षित है - लेकिन आप सावधान ऐसा करते हुए होना चाहिए, आप कम से कम उपयोग करना चाहिए EPOLLET (धार ट्रिगर मोड, के रूप में डिफ़ॉल्ट करने के लिए विरोध के स्तर का ट्रिगर) अन्य धागे में नकली जागने से बचने के लिए। इसका कारण यह है कि प्रसंस्करण के लिए एक नई घटना उपलब्ध होने पर स्तर-ट्रिगर मोड प्रत्येक धागे को जगाएगा। चूंकि केवल एक धागा इसके साथ काम करेगा, इससे अनावश्यक रूप से अधिकांश धागे जाग जाएंगे।

तो साझा epfd प्रयोग किया जाता है प्रत्येक थ्रेड अपनी ही घटनाओं सरणी या एक साझा घटनाओं सरणी epoll_wait करने के लिए() पास करना होगा

हाँ, आप, प्रत्येक थ्रेड पर एक अलग घटनाओं सरणी की जरूरत है या अन्यथा आपको दौड़ की स्थिति होगी और बुरा चीजें हो सकती हैं। उदाहरण के लिए, आपके पास एक थ्रेड हो सकता है जो अभी भी epoll_wait(2) द्वारा लौटाई गई घटनाओं के माध्यम से पुनरावृत्ति कर रहा है और अनुरोधों को संसाधित करता है जब अचानक एक और थ्रेड epoll_wait(2) को उसी सरणी के साथ कॉल करता है और फिर घटनाएं उसी समय ओवरराइट हो जाती हैं जब अन्य धागा उन्हें पढ़ रहा है। अच्छा नही! आप बिल्कुल प्रत्येक धागे के लिए एक अलग सरणी की आवश्यकता है।

मान लें कि आपके पास प्रत्येक थ्रेड के लिए एक अलग सरणी है, या तो संभावना - एक ही एपोल एफडी पर प्रतीक्षा करना या प्रत्येक थ्रेड के लिए एक अलग एपॉल एफडी होना - समान रूप से अच्छी तरह से काम करेगा, लेकिन ध्यान दें कि अर्थशास्त्र अलग हैं। एक वैश्विक रूप से साझा एपोल एफडी के साथ, प्रत्येक धागा किसी भी क्लाइंट से अनुरोध के लिए इंतजार कर रहा है, क्योंकि ग्राहक सभी एक ही एपॉल एफडी में जोड़े जाते हैं। प्रत्येक थ्रेड के लिए एक अलग एपॉल एफडी के साथ, प्रत्येक थ्रेड अनिवार्य रूप से ग्राहकों के एक उप-समूह के लिए ज़िम्मेदार होता है (वे क्लाइंट जिन्हें उस थ्रेड द्वारा स्वीकार किया जाता था)।

यह आपके सिस्टम के लिए अप्रासंगिक हो सकता है, या इससे बड़ा अंतर हो सकता है। उदाहरण के लिए, ऐसा हो सकता है कि एक थ्रेड दुर्भाग्यपूर्ण है जो बिजली उपयोगकर्ताओं के समूह को प्राप्त करने के लिए पर्याप्त है जो भारी और लगातार अनुरोध करते हैं, जिससे उस थ्रेड को अधिक काम किया जाता है, जबकि कम आक्रामक ग्राहकों के साथ अन्य धागे लगभग निष्क्रिय होते हैं। क्या यह अनुचित नहीं होगा? दूसरी तरफ, हो सकता है कि आप केवल कुछ धागे उपयोगकर्ता के विशिष्ट वर्ग से निपटने के लिए चाहते हैं, और उस स्थिति में शायद प्रत्येक थ्रेड पर अलग-अलग एपॉल एफडीएस हो। हमेशा की तरह, आपको दोनों संभावनाओं पर विचार करने, व्यापार बंद करने का मूल्यांकन करने, अपनी विशिष्ट समस्या के बारे में सोचने और निर्णय लेने की आवश्यकता है।

नीचे वैश्विक स्तर पर साझा एपोल एफडी का उपयोग करके एक उदाहरण है।मैं मूल रूप से यह सब करने की योजना नहीं बना रहा था, लेकिन एक चीज ने दूसरे को जन्म दिया, और, ठीक है, यह मजेदार था और मुझे लगता है कि यह आपको शुरू करने में मदद कर सकता है। यह एक गूंज सर्वर है जो पोर्ट 3000 पर सुनता है और इसमें नए ग्राहकों को स्वीकार करने और अनुरोधों को पूरा करने के लिए एपोल का उपयोग करके 20 धागे का पूल होता है।

#include <stdio.h> 
#include <stdlib.h> 
#include <inttypes.h> 
#include <errno.h> 
#include <string.h> 
#include <pthread.h> 
#include <assert.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <sys/epoll.h> 

#define SERVERPORT 3000 
#define SERVERBACKLOG 10 
#define THREADSNO 20 
#define EVENTS_BUFF_SZ 256 

static int serversock; 
static int epoll_fd; 
static pthread_t threads[THREADSNO]; 

int accept_new_client(void) { 

    int clientsock; 
    struct sockaddr_in addr; 
    socklen_t addrlen = sizeof(addr); 
    if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) { 
     return -1; 
    } 

    char ip_buff[INET_ADDRSTRLEN+1]; 
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { 
     close(clientsock); 
     return -1; 
    } 

    printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port)); 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLET; 
    epevent.data.fd = clientsock; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) { 
     perror("epoll_ctl(2) failed attempting to add new client"); 
     close(clientsock); 
     return -1; 
    } 

    return 0; 
} 

int handle_request(int clientfd) { 
    char readbuff[512]; 
    struct sockaddr_in addr; 
    socklen_t addrlen = sizeof(addr); 
    ssize_t n; 

    if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) { 
     return -1; 
    } 

    if (n == 0) { 
     return 0; 
    } 

    readbuff[n] = '\0'; 

    if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) { 
     return -1; 
    } 

    char ip_buff[INET_ADDRSTRLEN+1]; 
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { 
     return -1; 
    } 

    printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port), readbuff); 

    ssize_t sent; 
    if ((sent = send(clientfd, readbuff, n, 0)) < 0) { 
     return -1; 
    } 

    readbuff[sent] = '\0'; 

    printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port), readbuff); 

    return 0; 
} 

void *worker_thr(void *args) { 
    struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ); 
    if (events == NULL) { 
     perror("malloc(3) failed when attempting to allocate events buffer"); 
     pthread_exit(NULL); 
    } 

    int events_cnt; 
    while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) { 
     int i; 
     for (i = 0; i < events_cnt; i++) { 
      assert(events[i].events & EPOLLIN); 

      if (events[i].data.fd == serversock) { 
       if (accept_new_client() == -1) { 
        fprintf(stderr, "Error accepting new client: %s\n", 
         strerror(errno)); 
       } 
      } else { 
       if (handle_request(events[i].data.fd) == -1) { 
        fprintf(stderr, "Error handling request: %s\n", 
         strerror(errno)); 
       } 
      } 
     } 
    } 

    if (events_cnt == 0) { 
     fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?"); 
    } else { 
     perror("epoll_wait(2) error"); 
    } 

    free(events); 

    return NULL; 
} 

int main(void) { 
    if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { 
     perror("socket(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    struct sockaddr_in serveraddr; 
    serveraddr.sin_family = AF_INET; 
    serveraddr.sin_port = htons(SERVERPORT); 
    serveraddr.sin_addr.s_addr = INADDR_ANY; 

    if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { 
     perror("bind(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    if (listen(serversock, SERVERBACKLOG) < 0) { 
     perror("listen(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    if ((epoll_fd = epoll_create(1)) < 0) { 
     perror("epoll_create(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLET; 
    epevent.data.fd = serversock; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) { 
     perror("epoll_ctl(2) failed on main server socket"); 
     exit(EXIT_FAILURE); 
    } 

    int i; 
    for (i = 0; i < THREADSNO; i++) { 
     if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) { 
      perror("pthread_create(3) failed"); 
      exit(EXIT_FAILURE); 
     } 
    } 

    /* main thread also contributes as worker thread */ 
    worker_thr(NULL); 

    return 0; 
} 

कुछ नोट:

  • main()int, नहीं void लौटना चाहिए (जैसा कि आप अपने उदाहरण में दिखाई देते हैं)
  • हमेशा त्रुटि वापसी कोड के साथ सौदा। उन्हें अनदेखा करना बहुत आम है और जब चीजें तोड़ती हैं तो यह जानना मुश्किल होता है कि क्या हुआ।
  • कोड मानता है कि 511 बाइट्स से कोई अनुरोध बड़ा नहीं है (जैसा कि handle_request() में बफर आकार द्वारा देखा गया है)। यदि अनुरोध अधिक से अधिक है, तो यह संभव है कि कुछ डेटा सॉकेट में बहुत लंबे समय तक छोड़ा जाए, क्योंकि epoll_wait(2) तब तक रिपोर्ट नहीं करेगा जब तक उस फ़ाइल डिस्क्रिप्टर पर कोई नई घटना न हो (क्योंकि हम EPOLLET का उपयोग कर रहे हैं)। सबसे बुरे मामले में, ग्राहक वास्तव में कभी भी कोई नया डेटा नहीं भेज सकता है, और हमेशा के लिए एक उत्तर का इंतजार कर सकता है।
  • प्रत्येक अनुरोध के लिए थ्रेड पहचानकर्ता मुद्रित करने वाला कोड मानता है कि pthread_t एक अपारदर्शी सूचक प्रकार है। दरअसल, pthread_t लिनक्स में एक सूचक प्रकार है, लेकिन यह अन्य प्लेटफार्मों में एक पूर्णांक प्रकार हो सकता है, इसलिए यह पोर्टेबल नहीं है। हालांकि, शायद यह कोई समस्या नहीं है, क्योंकि एपोल लिनक्स विशिष्ट है, इसलिए कोड पोर्टेबल नहीं है।
  • यह मानता है कि एक ही ग्राहक से कोई अन्य अनुरोध तब नहीं आता जब थ्रेड अभी भी उस ग्राहक से अनुरोध कर रहा हो। यदि इस दौरान कोई नया अनुरोध आता है और कोई अन्य धागा इसकी सेवा करना शुरू कर देता है, तो हमारे पास दौड़ की स्थिति होती है और क्लाइंट को उन्हें उसी क्रम में इको संदेश प्राप्त नहीं होंगे, हालांकि, उन्हें write(2) परमाणु है, इसलिए जवाब हो सकते हैं आदेश से बाहर, वे छेड़छाड़ नहीं करेंगे)।
+0

व्यापक उत्तर के लिए धन्यवाद। वह बहुत मददगार था। – MiJo

+0

@MiJo Glad मैं मदद कर सकता था। यह एक अच्छा सवाल था :) –

संबंधित मुद्दे