मुझे प्रति सेकंड लगभग 250 संख्यात्मक मूल्यों को प्रति क्लाइंट स्टोर करना होगा, प्रति ग्राहक लगभग 900k संख्याएं। यह शायद एक पूर्ण दिन रिकॉर्डिंग (शायद दिन में 5-10 घंटे के बीच) नहीं होगा, लेकिन मैं क्लाइंट आईडी के आधार पर अपना डेटा विभाजित करूंगा और जिस दिन पढ़ा जा रहा है। अधिकतम पंक्ति लंबाई लगभग 22-23 एम पर आती है जो अभी भी प्रबंधनीय है। Neverteless, मेरी योजना इस तरह दिखता है:खराब सम्मिलन प्रदर्शन और डालने स्थिरता के साथ कैसंद्रा क्लस्टर
CREATE TABLE measurement (
clientid text,
date text,
event_time timestamp,
value int,
PRIMARY KEY ((clientid,date), event_time)
);
keyspace सिर्फ परीक्षण के लिए, 2 की एक प्रतिकृति कारक है, स्निच GossipingPropertyFileSnitch
और NetworkTopologyStrategy
है। मुझे पता है कि प्रतिकृति कारक 3 अधिक उत्पादन मानक है।
अगला, मैंने कंपनियों के सर्वर पर एक छोटा क्लस्टर बनाया, तीन सीडी धातु वर्चुअलाइज्ड मशीनों के साथ 2 सीपीयू एक्स 2 कोर और 16 जीबी रैम और बहुत सी जगह। मैं उनके साथ गीगाबिट लैन में हूं। नोडेटूल के आधार पर क्लस्टर परिचालित है।
Cluster cluster = Cluster.builder()
.addContactPoint("192.168.1.100")
.addContactPoint("192.168.1.102")
.build();
Session session = cluster.connect();
DateTime time = DateTime.now();
BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);
try {
ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts
String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also
//generating the entries
for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
time = time.plus(4); //4ms between each entry
BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
batch.add(bound);
//The batch statement must have 65535 statements at most
if (batch.size() >= 65534) {
queryQueue.put(batch);
batch = new BatchStatement();
}
}
queryQueue.put(batch); //the last batch, perhaps shorter than 65535
//storing the data
System.out.println("Starting storing");
while (!queryQueue.isEmpty()) {
pool.execute(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("Started: " + threadId);
BatchStatement statement = queryQueue.take();
long start2 = System.currentTimeMillis();
session.execute(statement);
System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
} catch (Exception ex) {
System.out.println(ex.toString());
}
});
}
pool.shutdown();
pool.awaitTermination(120,TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println(ex.toString());
} finally {
session.close();
cluster.close();
}
मैं यहाँ पोस्ट पढ़ कर और अन्य ब्लॉग और वेबसाइटों पर कोड के साथ आया था:
यहाँ कोड मैं अपने सेटअप का परीक्षण करने का उपयोग कर रहा है। जैसा कि मैंने समझा कि क्लाइंट के लिए कई धागे का उपयोग करना महत्वपूर्ण है, इसलिए मैंने ऐसा किया है। मैंने एसिंक ऑपरेशंस का उपयोग करने का भी प्रयास किया।
नीचे पंक्ति परिणाम यह है, कोई फर्क नहीं पड़ता कि मैं किस दृष्टिकोण का उपयोग करता हूं, एक बैच 5-6 सेकंड में निष्पादित होता है, हालांकि इसमें 10 तक का समय लग सकता है। अगर मैं केवल एक बैच दर्ज करता हूं तो यह वही होता है (इसलिए, केवल ~ 65k कॉलम) या यदि मैं एक गूंगा सिंगल थ्रेड एप्लिकेशन का उपयोग करता हूं। ईमानदारी से, मुझे थोड़ा और उम्मीद थी। विशेष रूप से जब से मैं अपने लैपटॉप पर स्थानीय उदाहरण के साथ कम या ज्यादा समान प्रदर्शन करता हूं।
दूसरा, शायद अधिक महत्वपूर्ण मुद्दा, अपवाद हैं जिन्हें मैं अप्रत्याशित तरीके से सामना कर रहा हूं। इन दो:
com.datastax.driver.core.exceptions.WriteTimeoutException: कैसेंड्रा टाइमआउट स्थिरता एक ही (1 प्रतिकृति की जरूरत थी, लेकिन केवल 0 लिखने स्वीकार किया)
पर लिखने क्वेरी के दौरान और
com.datastax.driver.core.exceptions.NoHostAvailableException: सभी मेजबान (रों) क्वेरी के लिए करने की कोशिश की हो सका (करने की कोशिश की: /192.168.1.102:9042 (com.datastax.dri ver.core.TransportException: [/192.168.1.102:9042] कनेक्शन बंद कर दिया गया है), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] कनेक्शन में है बंद कर दिया गया), /192.168.1.101:9042 (com.datastax.driver.core.TransportException: [/192.168.1.101:9042] कनेक्शन बंद कर दिया गया है))
लब्बोलुआब में, मैं कर रहा हूँ कुछ गलत कर रहा है? क्या मुझे डेटा लोड करने या योजना को बदलने के तरीके को पुनर्गठित करना चाहिए। मैंने पंक्ति की लंबाई को कम करने की कोशिश की (इसलिए मेरे पास 12 घंटे की पंक्तियां हैं) लेकिन इससे कोई बड़ा अंतर नहीं आया।
============================== अद्यतन:
मैं अशिष्ट था और एक उदाहरण पेस्ट करना भूल गया प्रश्न के बाद इस्तेमाल किए गए कोड का उत्तर दिया गया था। यह काफी अच्छी तरह से काम करता है, हालांकि मैं कैरोस डीबी के साथ अपना शोध जारी रख रहा हूं और एस्ट्यानैक्स के साथ बाइनरी हस्तांतरण कर रहा हूं।ऐसा लगता है कि मैं उनके साथ सीक्यूएल पर बेहतर प्रदर्शन कर सकता हूं, हालांकि कैरोसडीबी में कुछ समस्याएं हो सकती हैं जब यह अधिभार में होती है (लेकिन मैं इस पर काम कर रहा हूं) और एस्टियनैक्स मेरे स्वाद के लिए उपयोग करने के लिए थोड़ा वर्बोज़ है। फिर भी, यहां कोड है, मैं शायद कहीं गलत हो गया हूँ।
सेमफोर स्लॉट संख्या का प्रदर्शन 5000 से ऊपर जाने पर प्रदर्शन पर कोई प्रभाव नहीं पड़ता है, यह लगभग स्थिर है।
String insertQuery = "insert into keyspace.measurement (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
Semaphore semaphore = new Semaphore(15000);
System.out.println("Starting " + Thread.currentThread().getId());
DateTime time = DateTime.parse("2015-01-05T12:00:00");
//generating the entries
long start = System.currentTimeMillis();
for (int i = 0; i < 900000; i++) {
BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
semaphore.acquire();
ResultSetFuture resultSetFuture = session.executeAsync(statement);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {
semaphore.release();
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("Error: " + throwable.toString());
semaphore.release();
}
});
time = time.plus(4); //4ms between each entry
}
काफी भिन्न नहीं है। मुझे पूरा यकीन है कि मैं बैच का उपयोग करना चाहता हूं, क्योंकि मैंने पहले से ही अन्य परियोजनाओं में इसी तरह की चीजों पर काम किया है, और आम तौर पर बयान एक-दूसरे के धीमे थे। वैसे भी, यह तेजी से होने के लिए समझ में नहीं आता है। –
स्पोड सही है। कैसंद्रा में बैच प्रदर्शन अनुकूलन नहीं हैं। लॉग इन बैच का उपयोग केवल तभी किया जाना चाहिए जब परमाणुता की आवश्यकता होती है और परमाणु लेखन प्राप्त करने के लिए प्रदर्शन दंड होता है। यहां तक कि अनलॉक बैच भी सीधे एसिंक प्रश्नों से धीमे होते हैं, वे अनिवार्य रूप से अनावश्यक समन्वय को मजबूर करते हैं (जब तक कि आप कुंजी से बैच नहीं कर रहे हों और टोकन को जागरूक कर रहे हों - शायद आप यहां हैं)। मैं वैसे भी सीधे async लिखने की सिफारिश करते हैं। इस दृश्य को वापस करने के लिए यहां एक और आलेख दिया गया है: http: //lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/ – phact
आपके टाइमआउट के संबंध में, ऐसा होने पर यह होगा बहुत सी लिखने के साथ अपने सी * नोड्स overwhelm। एसिंक प्रश्नों के साथ ऐसा करना आसान है क्योंकि आपका प्रोग्राम लिखने के रूप में तेजी से लिख रहा है क्योंकि यह नॉनस्टॉप कर सकता है। अपने बैचों को हटाने के बाद (विशेष रूप से लॉगिंग) आपको एक सुधार देखना चाहिए, लेकिन अगर आपको एसएलए के परमिट की अनुमति है तो आपको अपने लेखन को थ्रॉटल करना होगा या अपने टाइमआउट्स को भी बढ़ाना होगा। – phact