2016-02-04 11 views
5

मैं निर्धारित किया है syslog में दो लॉग मर्ज लॉग भेजने के लिए logstash के लिए निम्न फिल्टर के साथ,:Logstash: एक उत्पादन दस्तावेज़

output { 
    elasticsearch 
    { hosts => ["localhost:9200"] 
    document_id => "%{job_id}" 
} 

} 
filter { 
    grok { 
     overwrite => ["message"] 
    } 
    json { 
    source => "message" 
    } 
} 

अपने आवेदन में से एक का एक विशिष्ट संदेश एक प्रारंभिक राज्य और एक job_id होगा :

{"job_id": "xyz782", state:"failed", processing_time: 12.345} 
:

{"job_id": "xyz782", state: "processing", job_type: "something"} 

कुछ मिनट या तो बाद में, एक और लॉग एक ही log_id, एक अलग राज्य, और एक प्रसंस्करण समय होगा

ये फ़ील्ड ठीक से लोड हो जाते हैं, लेकिन दो दस्तावेज़ बनाए जाते हैं।

{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345} 

आप में देख सकते हैं: के लिए केवल एक दस्तावेज़ प्रारंभिक लॉग, और बदले पहले एक अद्यतन करने के लिए दूसरा लॉग के लिए बनाया जा करने के लिए, अद्यतन दस्तावेज़ अर्थ निम्नलिखित क्षेत्रों के लिए होता है क्या मैं प्यार होता है मेरा लॉगस्टैश conf आउटपुट, मैं job_id को दस्तावेज़ आईडी के रूप में उपयोग करता हूं, हालांकि, दूसरा संदेश फ़ील्ड को पहले संदेश से बदलना प्रतीत होता है, लेकिन पहले संदेश में सभी फ़ील्ड मिटा देता है जो कि दूसरे में नहीं हैं, उदाहरण के लिए , पहले संदेश में मौजूद job_type फ़ील्ड अंतिम दस्तावेज़ में प्रकट नहीं होता है। इस तथ्य के साथ ऐसा करना पड़ सकता है कि जेसन दोनों बार एक ही क्षेत्र "संदेश" से आता है। लॉगस्टैश में एक दस्तावेज़ में दो लॉग संदेशों को विलय करने का कोई और तरीका है?

उत्तर

4

आप ऐसा करने के लिए aggregate फ़िल्टर का उपयोग कर सकते हैं। कुल फ़िल्टर एक सामान्य फ़ील्ड मान के आधार पर एक एकल ईवेंट में कई लॉग लाइनों को एकत्रित करने के लिए समर्थन प्रदान करता है। आपके मामले में, सामान्य क्षेत्र job_id फ़ील्ड होगा।

फिर हमें दूसरी घटना बनाम दूसरे ईवेंट को पहचानने के लिए एक और फ़ील्ड की आवश्यकता है जिसे समेकित किया जाना चाहिए। आपके मामले में, यह state फ़ील्ड होगा।

तो आप बस इस तरह, अपने मौजूदा Logstash विन्यास के लिए एक और फिल्टर जोड़ने की जरूरत:

filter { 
    ...your other filters 

    if [state] == "processing" { 
     aggregate { 
      task_id => "%{job_id}" 
     } 
    } else if [state] == "failed" { 
     aggregate { 
      task_id => "%{job_id}" 
      end_of_task => true 
      timeout => 120 
     } 
    } 
} 

आप को समायोजित करने के लिए स्वतंत्र हैं timeout (सेकंड में) कितनी देर तक अपने काम चला रहे हैं पर निर्भर करता है।

+0

ठीक है। आपके उत्तर के लिए धन्यवाद, मैंने कुल देखा लेकिन प्रारंभ में सोचा कि यह अधिक हो सकता है क्योंकि मैंने सोचा था कि मुझे प्रत्येक फ़ील्ड का मैपिंग करने की ज़रूरत है, जिसे मैं कुल में रखना चाहता हूं। तो मुझे लगता है कि मैं राज्य क्षेत्र को केवल असफल होने के बजाय उपस्थित होने की जांच करूँगा (क्योंकि हम अभी भी सफलता को जोड़ना चाहते हैं।) एक बार एकत्रीकरण करने के बाद, प्रारंभिक संदेश हटा दिया जाएगा, या फिर भी लोचदार हो जाएगा ? मुझे लगता है कि जानने के लिए केवल एक ही रास्ता है! एक बार फिर धन्यवाद। –

+1

प्रारंभिक संदेश इसे 'end_of_task' सत्य होने तक लोचदार में नहीं बनाता है, इसलिए निकालने के लिए कुछ भी नहीं है। – Val

+0

ठीक है, तो अगर प्रारंभिक संदेश से लोचदार में कुछ भी नहीं मिलता है, तो क्या उस job_id के लिए लोचदार से पूछने का कोई तरीका है और यह पता चल रहा है कि राज्य के असफल होने या सफलता के साथ दूसरे संदेश से पहले यह प्रसंस्करण हो रहा है? –

संबंधित मुद्दे