2015-10-05 7 views
11

मैं एकाधिक लेखकों और पोस्टग्रेज़ डेटाबेस का उपयोग कर एक एकाधिक पाठकों के साथ एक विश्वसनीय कतार लागू करने की कोशिश कर रहा हूं। गायब पंक्तियों से बचने के लिए जब एक कतार पाठक एक टेबल स्कैन करता है तो प्रगति लेनदेन पढ़ने के बाद प्रतिबद्ध होता है।PostgreSQL - एक विश्वसनीय कतार लागू करना

हमारे पास एक "चेकपॉइंट" समय का उपयोग करके बैच में पंक्तियों का चयन करने वाला एक पाठक है, जहां प्रत्येक बैच को पिछले बैच में अंतिम टाइमस्टैम्प के बाद पंक्तियां मिलती हैं, और हम पंक्तियां गायब हैं। (कारण: टाइमस्टैम्प मान उस समय पर आधारित होता है जब INSERT होता है (00.00.00)। भारी भार पर, यदि लेन-देन में अधिक समय लगता है, तो इसे 10 सेकंड बाद (00.00.10) कहने के लिए डाला जाता है, पाठक इस पंक्ति को याद करेगा (पंक्ति 1) अगर यह उस 10 सेकंड के दौरान पढ़ता है और पंक्ति 1 की तुलना में बाद में (00.00.05) में एक आईएनएसईआरटी समय प्राप्त करता है। समस्या का पूरा विवरण इस ब्लॉग में लिखे गए एक जैसा है। http://blog.thefourthparty.com/stopping-time-in-postgresql/)

संदर्भ के लिए संबंधित पूर्व प्रश्न: Postgres LISTEN/NOTIFY - low latency, realtime?

अद्यतन: मैं कई पाठकों के लिए एक पाठक होने से सवाल अद्यतन किया था। ऑर्डर जिसमें पाठक पढ़ता है, मायने रखता है।

+1

http://stackoverflow.com/q/6507475/330315 और http://stackoverflow.com/q/22765054/330315 मदद कर सकता है। आप इसे भी देखना चाहेंगे: http://pgxn.org/dist/pg_message_queue/ –

+0

क्या यह महत्वपूर्ण है कि सभी पंक्तियों को * क्रमशः * संसाधित किया जाए? फिर ऑर्डर कैसे परिभाषित किया गया है * ठीक *? या आप बस गायब पंक्तियों से बचना चाहते हैं? फिर यहां प्रस्तुत किए गए समाधान को काम करना चाहिए: [पोस्टग्रेज़ अपडेट, सीमा 1] (http://dba.stackexchange.com/a/69497/3684) –

+0

क्या वास्तव में यह आवश्यक है कि आप postgresql के साथ ऐसा करें? यह ऐसी आवश्यकता है जो बहुत आसानी से पूर्ण हो सकती है redis – e4c5

उत्तर

3

एकाधिक पाठकों को ध्यान में रखते हुए, यह नियंत्रित करना आवश्यक है कि प्रत्येक पाठक को पहले से ही कौन से रिकॉर्ड प्राप्त हुए हों।

इसके अलावा, यह कहा गया है कि ऑर्डर एक पाठक को रिकॉर्ड भेजने की शर्त है। इसलिए, यदि कुछ और लेनदेन पहले के पहले किए गए थे, तो हमें पाठक को भेजे गए रिकॉर्ड के आदेश को बनाए रखने के लिए, "रोक" देना होगा और जब उसने प्रतिबद्ध किया था, तो रिकॉर्ड फिर से भेजना होगा।

जिसके अनुसार, कार्यान्वयन की जाँच करें:

-- lets create our queue table 
drop table if exists queue_records cascade; 
create table if not exists queue_records 
(
    cod serial primary key, 
    date_posted timestamp default timeofday()::timestamp, 
    message text 
); 


-- lets create a table to save "checkpoints" per reader_id 
drop table if exists queue_reader_checkpoint cascade; 
create table if not exists queue_reader_checkpoint 
(
    reader_id text primary key, 
    last_checkpoint numeric 
); 



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text) 
RETURNS SETOF queue_records AS 
$BODY$ 
DECLARE 
    vLAST_CHECKPOINT numeric; 
    vCHECKPOINT_EXISTS integer; 
    vRECORD   queue_records%rowtype; 
BEGIN 

    -- let's get the last record sent to the reader 
    SELECT last_checkpoint 
    INTO vLAST_CHECKPOINT 
    FROM queue_reader_checkpoint 
    WHERE reader_id = pREADER_ID; 

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on. 
    if (vLAST_CHECKPOINT is null) then 
     -- sets the flag indicating the reader does not have any checkpoint recorded 
     vCHECKPOINT_EXISTS = 0; 
     -- gets the very last commited record 
     SELECT MAX(cod) 
     INTO vLAST_CHECKPOINT 
     FROM queue_records; 
    else 
     -- sets the flag indicating the reader already have a checkpoint recorded 
     vCHECKPOINT_EXISTS = 1; 
    end if; 

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
      SELECT * 
      FROM queue_records 
      WHERE COD > vLAST_CHECKPOINT 
      ORDER BY COD 
    LOOP 

     -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order 
     if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then 

      -- let's save the last record read 
      vLAST_CHECKPOINT = vRECORD.COD; 

      -- and return it 
      RETURN NEXT vRECORD; 

     -- but, if it is not, then is out of order 
     else 
      -- the reason is some transaction did not commit yet, but there's another further transaction that alread did. 
      -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already; 
      exit; 
     end if; 
    END LOOP; 


    -- now we have to persist the last record read to be retrieved on next call 
    if (vCHECKPOINT_EXISTS = 0) then 
     INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT); 
    else   
     UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID; 
    end if; 
end; 
$BODY$ LANGUAGE plpgsql VOLATILE; 
+0

मुझे कुछ "चेकपॉइंट" होना चाहिए। क्या आप सुझाव दे रहे हैं कि मैं इसके लिए "कोड" का उपयोग करता हूं? इसके अलावा आदेश पूरी तरह से गड़बड़ हो जाएगा। – Chandra

+0

और रिकॉर्ड पढ़ने के लिए एक शर्त आदेश है? – Christian

+0

मुझे लगता है कि @ चंद्र के उपयोग के मामले में, कई पाठक सभी एक ही टेबल को पढ़ते हैं लेकिन शायद अलग-अलग समय और अलग-अलग गति पर। मैं स्पष्ट नहीं हूं कि कैसे कोड अपने उपयोग के मामले में मदद करेगा। –

संबंधित मुद्दे