2017-02-17 5 views
13

हम इस तरह 500k के बारे में तत्वों के साथ एक XML फ़ाइल (xml-stream का प्रयोग करके) पढ़ते हैं और उन्हें सम्मिलित कर MongoDB करते हैं: जब मोंगोnode.js में डिस्कनेक्ट के दौरान MongoDB प्रविष्टियों को कैसे बफर करें?

collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { }); 

अब:

xml.on(`endElement: product`, writeDataToDb.bind(this, "product")); 

writeDataToDb(type, obj) में सम्मिलित इस तरह दिखता है कनेक्शन डिस्कनेक्ट हो जाता है, एक्सएमएल स्ट्रीम अभी भी पढ़ता है और कंसोल त्रुटि संदेशों के साथ बाढ़ आ जाता है (सम्मिलित नहीं हो सकता है, डिस्कनेक्ट किया जा सकता है, ईपीआईपीई टूटा हुआ है ...)।

docs में यह कहते हैं:

जब आप mongod प्रक्रिया को बंद कर दिया, ड्राइवर प्रोसेसिंग आपरेशन बंद हो जाता है और bufferMaxEntries जा रहा है -1 डिफ़ॉल्ट बफर अर्थ सभी कार्यों द्वारा की वजह से उन्हें बफरिंग रहता है।

यह बफर वास्तव में क्या करता है?

हम नोटिस जब हम डेटा सम्मिलित करें और मोंगो सर्वर बंद करते हैं, बातें बफ़र हो, तो हम मोंगो सर्वर वापस ऊपर लाने के लिए, देशी ड्राइवर सफलतापूर्वक पुन: कनेक्ट हो और नोड (ऑफ़लाइन beeing मोंगो दौरान डालने डेटा लेकिन बफ़र दस्तावेजों शुरू) फिर से डाला नहीं है।

तो मैं इस बफर और इसके उपयोग पर सवाल करता हूं।

लक्ष्य:

हम सबसे अच्छा तरीका है बफर में आवेषण रखने के लिए के लिए देख रहे हैं जब तक मोंगो वापस (wtimeout के अनुसार 15000milliseconds में) आता है और फिर बफ़र दस्तावेजों डालने या xml.pause(); और xml.resume() का इस्तेमाल करते हैं जो करते हैं हमने सफलता के बिना कोशिश की।

असल में हमें डेटा हानि या इंटरप्ट के बिना डिस्कनेक्ट को संभालने में थोड़ा सा मदद चाहिए।

+0

इस दोहराने नहीं कर सकते, दोनों डॉक्स और परीक्षण 'का उपयोग कर एक्सएमएल-stream' मोन एक बार बफ़र ऑब्जेक्ट सम्मिलित में उदाहरण सर्वर वापस आ गया है .. शायद आप अधिक कोड पोस्ट कर सकते हैं/अपने सेटअप के बारे में कुछ और जानकारी दे सकते हैं? – cviejo

+0

@cviejo मैं अपनी स्क्रिप्ट को साझा नहीं कर सकता क्योंकि वे कंपनी से संबंधित हैं, लेकिन क्या आप मुझे उस स्क्रिप्ट को भेजना चाहेंगे जिसे आपने दोहराने की कोशिश की थी? जिस्ट/पेस्टबिन ठीक रहेगा। – DanFromGermany

उत्तर

1

मैं विशेष रूप से मोंगोड ड्राइवर और प्रविष्टियों के इस बफर के बारे में नहीं जानता। शायद यह केवल विशिष्ट परिदृश्यों में डेटा रखता है।

तो मैं इस प्रश्न का उत्तर अधिक सामान्य दृष्टिकोण के साथ दूंगा जो किसी भी डेटाबेस के साथ काम कर सकता है।

संक्षेप में, आप दो समस्याएं हैं:

  1. आप असफल प्रयासों से ठीक नहीं कर रहे हैं
  2. एक्सएमएल धारा

प्रथम अंक को संभालने के लिए बहुत तेजी से डेटा भेजने के लिए, आप लागू करने की आवश्यकता एक पुनः प्रयास एल्गोरिदम जो यह सुनिश्चित करेगा कि छोड़ने से पहले कई प्रयास किए जाएंगे।

दूसरी समस्या को संभालने के लिए, आपको एक्सएमएल स्ट्रीम पर बैक प्रेशर लागू करने की आवश्यकता है। आप pause विधि, resume विधि और इनपुट बफर का उपयोग करके ऐसा कर सकते हैं।इसके साथ

var Promise = require('bluebird'); 
var fs = require('fs'); 
var Xml = require('xml-stream'); 

var fileStream = fs.createReadStream('myFile.xml'); 
var xml = new Xml(fileStream); 

// simple exponential retry algorithm based on promises 
function exponentialRetry(task, initialDelay, maxDelay, maxRetry) { 
    var delay = initialDelay; 
    var retry = 0; 
    var closure = function() { 
     return task().catch(function(error) { 
      retry++; 
      if (retry > maxRetry) { 
       throw error 
      } 
      var promise = Promise.delay(delay).then(closure); 
      delay = Math.min(delay * 2, maxDelay); 
      return promise; 
     }) 
    }; 
    return closure(); 
} 

var maxPressure = 100; 
var currentPressure = 0; 
var suspended = false; 
var stopped = false; 
var buffer = []; 

// handle back pressure by storing incoming tasks in the buffer 
// pause the xml stream as soon as we have enough tasks to work on 
// resume it when the buffer is empty 
function writeXmlDataWithBackPressure(product) { 
    // closure used to try to start a task 
    var tryStartTask = function() { 
     // if we have enough tasks running, pause the xml stream 
     if (!stopped && !suspended && currentPressure >= maxPressure) { 
      xml.pause(); 
      suspended = true; 
      console.log("stream paused"); 
     } 
     // if we have room to run tasks 
     if (currentPressure < maxPressure) { 
      // if we have a buffered task, start it 
      // if not, resume the xml stream 
      if (buffer.length > 0) { 
       buffer.shift()(); 
      } else if (!stopped) { 
       try { 
        xml.resume(); 
        suspended = false; 
        console.log("stream resumed"); 
       } catch (e) { 
        // the only way to know if you've reached the end of the stream 
        // xml.on('end') can be triggered BEFORE all handlers are called 
        // probably a bug of xml-stream 
        stopped = true; 
        console.log("stream end"); 
       } 
      } 
     } 
    }; 

    // push the task to the buffer 
    buffer.push(function() { 
     currentPressure++; 
     // use exponential retry to ensure we will try this operation 100 times before giving up 
     exponentialRetry(function() { 
      return writeDataToDb(product) 
     }, 100, 2000, 100).finally(function() { 
      currentPressure--; 
      // a task has just finished, let's try to run a new one 
      tryStartTask(); 
     }); 
    }); 

    // we've just buffered a task, let's try to run it 
    tryStartTask(); 
} 

// write the product to database here :) 
function writeDataToDb(product) { 
    // the following code is here to create random delays and random failures (just for testing) 
    var timeToWrite = Math.random() * 100; 
    var failure = Math.random() > 0.5; 
    return Promise.delay(timeToWrite).then(function() { 
     if (failure) { 
      throw new Error(); 
     } 
     return null; 
    }) 
} 

xml.on('endElement: product', writeXmlDataWithBackPressure); 

प्ले, समझने के लिए कि यह कैसे बर्ताव करता है कुछ console.log डाल दिया। मुझे आशा है कि यह आपको आपकी समस्या का समाधान करने में मदद करेगा :)

+0

यह मूल रूप से एक अच्छा कार्यान्वयन है, लेकिन मुझे उम्मीद है कि आंतरिक लेखन चिंता/मोंगो के लिखने वाले बफर का उपयोग करने में सक्षम होना चाहिए - कृपया [इस पृष्ठ] में एक नज़र डालें (https://mongodb.github.io/node-mongodb- देशी/ड्राइवर-लेख/anintroductionto1_4_and_2_6.html) और कीवर्ड 'bufferMaxEntries'। – DanFromGermany

2

insertOne() के साथ 500K तत्व डालने का एक बहुत बुरा विचार है। आपको इसके बजाय bulk operations का उपयोग करना चाहिए जो आपको एक ही अनुरोध में कई दस्तावेज़ डालने की अनुमति देता है। (यहाँ उदाहरण 10000 के लिए, तो यह 50 एकल अनुरोध किया जा सकता है) बफरिंग समस्या से बचने के लिए, आप मैन्युअल रूप से इसे संभाल कर सकते हैं:

    साथ
  1. अक्षम बफरिंग bufferMaxEntries: 0
  2. सेट फिर से कनेक्ट गुण: reconnectTries: 30, reconnectInterval: 1000
  3. एक थोक ऑपरेशन बनाएं और इसे 10000 आइटम
  4. के साथ फ़ीड करें xml रीडर को रोकें। 10000 आइटम डालने का प्रयास करें।

यहाँ एक नमूना स्क्रिप्ट है: अगर यह विफल रहता है, हर 3000ms (11000 त्रुटि कोड) पुन: प्रयास जब तक यह

  • सफल होने अगर थोक आपरेशन निष्पादन के दौरान बाधित है आप कुछ नकली आईडी मुद्दों का सामना कर सकते हैं, इसलिए उन्हें अनदेखा:

    var fs = require('fs') 
    var Xml = require('xml-stream') 
    
    var MongoClient = require('mongodb').MongoClient 
    var url = 'mongodb://localhost:27017/test' 
    
    MongoClient.connect(url, { 
        reconnectTries: 30, 
        reconnectInterval: 1000, 
        bufferMaxEntries: 0 
    }, function (err, db) { 
        if (err != null) { 
        console.log('connect error: ' + err) 
        } else { 
        var collection = db.collection('product') 
        var bulk = collection.initializeUnorderedBulkOp() 
        var totalSize = 500001 
        var size = 0 
    
        var fileStream = fs.createReadStream('data.xml') 
        var xml = new Xml(fileStream) 
        xml.on('endElement: product', function (product) { 
         bulk.insert(product) 
         size++ 
         // if we have enough product, save them using bulk insert 
         if (size % 10000 == 0) { 
         xml.pause() 
         bulk.execute(function (err, result) { 
          if (err == null) { 
          bulk = collection.initializeUnorderedBulkOp() 
          console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try') 
          xml.resume() 
          } else { 
          console.log('bulk insert failed: ' + err) 
          counter = 0 
          var retryInsert = setInterval(function() { 
           counter++ 
           bulk.execute(function (err, result) { 
           if (err == null) { 
            clearInterval(retryInsert) 
            bulk = collection.initializeUnorderedBulkOp() 
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
            xml.resume() 
           } else if (err.code === 11000) { // ignore duplicate ID error 
            clearInterval(retryInsert) 
            bulk = collection.initializeUnorderedBulkOp() 
            console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') 
            xml.resume() 
           } else { 
            console.log('failed after first try: ' + counter, 'error: ' + err) 
           } 
           }) 
          }, 3000) // retry every 3000ms until success 
          } 
         }) 
         } else if (size === totalSize) { 
         bulk.execute(function (err, result) { 
          if (err == null) { 
          db.close() 
          } else { 
          console.log('bulk insert failed: ' + err) 
          } 
         }) 
         } 
        }) 
        } 
    }) 
    

    नमूना लॉग उत्पादन:

    doc 0 : 10000 saved on first try 
    doc 10000 : 20000 saved on first try 
    doc 20000 : 30000 saved on first try 
    [...] 
    bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown 
    failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0 
    failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0 
    failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0 
    doc 130000 : 140000 saved after 4 tries 
    doc 140000 : 150000 saved on first try 
    [...] 
    
  • +0

    आपका उत्तर मोंगो लिखने वाले बफर के बारे में जानकारी प्रदान नहीं करता है और प्रतिकृति सेट या डिस्कनेक्ट में माटर परिवर्तन के दौरान भी सभी दस्तावेज़ों को सम्मिलित करने के बारे में कोई समाधान नहीं है। थोक सम्मिलन के बारे में जानकारी दिलचस्प है और मैं इसे देख लूंगा, धन्यवाद! – DanFromGermany

    +0

    @DanFromGermany हाँ, क्योंकि मेरे लिए ऐसा लगता है कि आप गलत समस्या को हल करने का प्रयास कर रहे हैं: वास्तविक समस्या यह है कि आपके ऐप्स डेटाबेस से डिस्कनेक्ट हो जाते हैं। डेटाबेस में कम कॉल के साथ, ऑटो-रीकनेक्ट करना आसान होगा, इसलिए – felix

    +0

    लिखने की कोई ज़रूरत नहीं है मेरे ऐप्स ** ** डेटाबेस से डिस्कनेक्ट नहीं होते हैं। मैं उन ऐप्स को लिखना चाहता हूं जो ** ** डिस्कनेक्ट * या * प्रतिकृति सेट में मास्टर-स्विच के मामले में सभी डेटा को फिर से कनेक्ट करने और लिखने के लिए कहते हैं। – DanFromGermany

    संबंधित मुद्दे