2016-01-19 6 views
6

मान लेते हैं कि Oracle स्कीमा निम्न तालिकाओं और स्तंभ होते हैं:जटिल नेस्टेड elasticsearch दस्तावेज़ का अपडेट logstash और JDBC का उपयोग कर

 

    Country 
     country_id; (Primary Key) 
     country_name; 

    Department 
     department_id; (Primary Key) 
     department_name; 
     country_id; (Foreign key to Country:country_id) 

    Employee 
     employee_id; (Primary Key) 
     employee_name; 
     department_id; (Foreign key to Department:department_id) 

और मैं अपने Elasticsearch दस्तावेज़ जहां मूल तत्व एक देश है और यह सभी विभागों में शामिल है उस देश में जो बदले में सभी विभागों को संबंधित विभागों में शामिल करता है।

तो दस्तावेज़ संरचना इस तरह दिखता है:

 

    { 
     "mappings": { 
     "country": { 
      "properties": { 
      "country_id": { "type": "string"}, 
      "country_name": { "type": "string"},   
      "department": { 
       "type": "nested", 
       "properties": { 
       "department_id": { "type": "string"}, 
       "department_name": { "type": "string"}, 
       "employee": { 
        "type": "nested", 
        "properties": { 
        "employee_id": { "type": "string"}, 
        "employee_name": { "type": "string"} 
        } 
       } 
       } 
      } 
      } 
     } 
     } 
    }   

मैं प्रत्येक मेज पर चल रहे अलग इनपुट JDBC प्रश्न हैं करने में सक्षम होना चाहते हैं और वे/अद्यतन बनाने/elasticsearch दस्तावेज़ में डेटा हटा देना चाहिए जब भी आधार तालिका में डेटा जोड़ा/अपडेट/हटा दिया गया है।

यह एक उदाहरण समस्या है और वास्तविक सारणी और डेटा संरचना अधिक जटिल हैं। तो मैं इस तक सीमित समाधान की तलाश नहीं कर रहा हूं।

क्या यह हासिल करने का कोई तरीका है?

धन्यवाद।

+0

मेरा अनुमान है कि आप पहले से ही इस का हल हो सकता है, तथापि, का उपयोग आप का उपयोग नहीं कर सकता है दस्तावेज़ संरचना प्रारूप (देश, विभाग, कर्मचारी) में आवश्यक डेटा को गठबंधन करने के लिए ओरेकल व्यू और इसे एक जेडीबीसी क्वेरी के रूप में, इस तरह आप लोचदार खोज दस्तावेज़ आईडी को निम्नतम अद्वितीय स्तर (कर्मचारी_आईडी) के रूप में बनाने में सक्षम होंगे। मामला) और वहां परिवर्तनों का प्रबंधन? –

उत्तर

0

स्तर एक के लिए, aggregate filter का उपयोग कर सीधे आगे बढ़ें। संदर्भ के लिए आपको उनके बीच एक सामान्य आईडी होना चाहिए।

filter {  

    aggregate { 
    task_id => "%{id}" 

    code => "  
     map['id'] = event.get('id') 
     map['department'] ||= [] 
     map['department'] << event.to_hash.each do |key,value| { key => value } end  
    " 
    push_previous_map_as_event => true 
    timeout => 150000 
    timeout_tags => ['aggregated']  
    } 

    if "aggregated" not in [tags] { 
    drop {} 
    } 
} 

महत्वपूर्ण: उत्पादन कार्रवाई को अद्यतन किया जाना चाहिए

output { 
     elasticsearch { 
      action => "update" 
      ... 
      } 
     } 

एक तरह से करने के लिए स्तर 2 का समाधान पहले से अनुक्रमित दस्तावेज़ क्वेरी और नेस्टेड रिकॉर्ड के साथ यह अद्यतन करने के लिए है । फिर से aggregate filter का उपयोग कर; दस्तावेज़ के लिए एक सामान्य आईडी होना चाहिए ताकि आप सही दस्तावेज़ में देख और सम्मिलित कर सकें।

filter {  
    #get the document from elastic based on id and store it in 'emp' 
    elasticsearch { 
      hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"] 
      query => "id:%{id}" 
      fields => { "employee" => "emp" } 
     } 



    aggregate { 
    task_id => "%{id}" 
    code => "  
       map['id'] = event.get('id') 
       map['employee'] = [] 
       employeeArr = [] 
       temp_emp = {} 

       event.to_hash.each do |key,value|      
        temp_emp[key] = value 
       end  

       #push the objects into an array 
       employeeArr.push(temp_emp) 

       empArr = event.get('emp')     

       for emp in empArr 
        emp['employee'] = employeeArr      
        map['employee'].push(emp) 
       end 
    " 
    push_previous_map_as_event => true 
    timeout => 150000 
    timeout_tags => ['aggregated'] 

    } 

    if "aggregated" not in [tags] { 
    drop {} 
    } 

} 

output { 

elasticsearch { 
     action => "update" #important 
     ... 
     } 
} 

इसके अलावा, गहरे लाल रंग का कोड डिबग करने के लिए, नीचे उत्पादन में

output{ 
    stdout { codec => dots } 
} 
संबंधित मुद्दे