2015-09-08 9 views
7

मुझे समस्या है जब मैं कैसंड्रा से पढ़ने के लिए स्पार्क स्ट्रीमिंग का उपयोग करता हूं।स्पार्क स्ट्रीमिंग का उपयोग कर कैसंद्रा से पढ़ना

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

के रूप में ऊपर दिया गया लिंक मैं

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3) 

का उपयोग कैसेंड्रा से डेटा का चयन करने, लेकिन ऐसा लगता चिंगारी स्ट्रीमिंग का सिर्फ एक क्वेरी एक बार है कि, लेकिन मैं इसे का उपयोग क्वेरी करने के लिए जारी है चाहता हूँ एक अंतराल 10 सेनकंड्स।

मेरा कोड निम्नानुसार है, आपकी प्रतिक्रिया के लिए इच्छा है।

धन्यवाद!

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import com.datastax.spark.connector.streaming._ 
import org.apache.spark.rdd._ 
import scala.collection.mutable.Queue 


object SimpleApp { 
def main(args: Array[String]){ 
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1") 

    val ssc = new StreamingContext(conf, Seconds(10)) 

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

    //rdd.collect().foreach(println) 

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]() 


    val dstream = ssc.queueStream(rddQueue) 

    dstream.print() 

    ssc.start() 
    rdd.collect().foreach(println) 
    rddQueue += rdd 
    ssc.awaitTermination() 
} 

}

+0

आप का वर्णन कर सकता है कि आप क्या हासिल करना चाहते हैं? प्रत्येक अंतराल पर पूरी तालिका पढ़ें? स्ट्रीमिंग डेटा कहां से आ रहा है? – maasg

+0

@maasg मैं उस समय से संबंधित कुछ रिकॉर्ड पूछने के लिए प्रत्येक अंतराल (जैसे 10s) पर तालिका को पढ़ना चाहता हूं। इसका मतलब है कि मैं कैसंड्रा स्पार्क स्ट्रीमिंग का स्रोत बनना चाहता हूं। एक शब्द में, मैं डीस्ट्रीम के निर्माण पर अवरुद्ध हूं। क्या आप कुछ सुझाव और उदाहरण देना चाहते हैं? बहुत बहुत धन्यवाद! –

उत्तर

6

आप इनपुट के रूप में CassandraRDD के साथ एक ConstantInputDStream बना सकते हैं। ConstantInputDStream प्रत्येक स्ट्रीमिंग अंतराल पर एक ही आरडीडी प्रदान करेगा, और उस आरडीडी पर एक क्रिया निष्पादित करके आप आरडीडी वंश के भौतिककरण को ट्रिगर करेंगे, जिससे हर बार कैसंद्रा पर क्वेरी निष्पादित हो जाती है।

सुनिश्चित करें कि पूछताछ की जा रही क्वेरी क्वेरी समय बढ़ने से बचने के लिए असंबद्ध नहीं होती है और जिसके परिणामस्वरूप अस्थिर स्ट्रीमिंग प्रक्रिया होती है।

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 
+3

क्या होगा यदि मैं केवल आरडीडी संसाधित होने के बाद से तालिका में सहेजा गया ** नया डेटा ** पढ़ना चाहता हूं? क्या यह संभव है? –

+2

पुराने डेटा को फिर से लाने के लिए एक तरीका है? यह एक अनंत पाश में रहता है। –

+0

@yurishkuro AFAIK जो वर्तमान में संभव नहीं है। – maasg

0

मैं एक ही मुद्दा था और InputDStream वर्ग का एक उपवर्ग बनाकर एक समाधान पाया:

कुछ इस तरह चाल (प्रारंभिक बिंदु के रूप में अपने कोड का उपयोग कर) करना चाहिए। start() और compute() विधियों को परिभाषित करना आवश्यक है।

start() तैयारी के लिए उपयोग किया जा सकता है। मुख्य तर्क compute() में रहता है। यह Option[RDD[T]] वापस आ जाएगा। कक्षा को लचीला बनाने के लिए, InputStreamQuery विशेषता परिभाषित की गई है।

trait InputStreamQuery[T] { 
    // where clause condition for partition key 
    def partitionCond : (String, Any) 
    // function to return next partition key 
    def nextValue(v:Any) : Option[Any] 
    // where clause condition for clustering key 
    def whereCond : (String, (T) => Any) 
    // batch size 
    def batchSize : Int 
} 

कैसेंड्रा तालिका keyspace.test के लिए, test_by_date जो विभाजन कुंजी date द्वारा तालिका reorganizes पैदा करते हैं। test तालिका के लिए

CREATE TABLE IF NOT exists keyspace.test 
(id timeuuid, date text, value text, primary key (id)) 

CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS 
SELECT * 
FROM keyspace.test 
WHERE id IS NOT NULL 
PRIMARY KEY (date, id) 
WITH CLUSTERING ORDER BY (id ASC); 

एक संभावित क्रियान्वयन किया जाएगा

class class Test(id:UUID, date:String, value:String) 

trait InputStreamQueryTest extends InputStreamQuery[Test] { 
    val dateFormat = "uuuu-MM-dd" 

    // set batch size as 10 records 
    override def batchSize: Int = 10 

    // partitioning key conditions, query string and initial value 
    override def partitionCond: (String, Any) = ("date = ?", "2017-10-01") 
    // clustering key condition, query string and function to get clustering key from the instance 
    override def whereCond: (String, Test => Any) = (" id > ?", m => m.id) 
    // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01' 
    override def nextValue(v: Any): Option[Any] = { 

    import java.time.format.DateTimeFormatter 

    val formatter = DateTimeFormatter.ofPattern(dateFormat) 
    val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1) 
    if (nextDate.isAfter(LocalDate.now())) None 
    else Some(nextDate.format(formatter)) 
    } 
} 

यह इस प्रकार के रूप में CassandraInputStream कक्षा में इस्तेमाल किया जा सकता।

class CassandraInputStream[T: ClassTag] 
(_ssc: StreamingContext, keyspace:String, table:String) 
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) 
extends InputDStream[T](_ssc) with InputStreamQuery[T] { 

var lastElm:Option[T] = None 
var partitionKey : Any = _ 

override def start(): Unit = { 

    // find a partition key which stores some records 
    def findStartValue(cql : String, value:Any): Any = { 
    val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1) 

    if (rdd.cassandraCount() > 0) value 
    else { 
     nextValue(value).map(findStartValue(cql, _)).getOrElse(value) 
    } 
    } 
    // get query string and initial value from partitionCond method 
    val (cql, value) = partitionCond 
    partitionKey = findStartValue(cql, value) 
} 

override def stop(): Unit = {} 

override def compute(validTime: Time): Option[RDD[T]] = { 
    val (cql, _) = partitionCond 
    val (wh, whKey) = whereCond 

    def fetchNext(patKey: Any) : Option[CassandraTableScanRDD[T]] = { 
    // query with partitioning condition 
    val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, patKey) 

    val rdd = lastElm.map{ x => 
     query.where(wh, whKey(x)).withAscOrder.limit(batchSize) 
    }.getOrElse(query.withAscOrder.limit(batchSize)) 

    if (rdd.cassandraCount() > 0) { 
     // store the last element of this RDD 
     lastElm = Some(rdd.collect.last) 
     Some(rdd) 
    } 
    else { 
     // find the next partition key which stores data 
     nextValue(patKey).flatMap{ k => 
     partitionKey = k 
     fetchNext(k)} 
    } 
    } 

    fetchNext(partitionKey) 
} 
} 

संयोजन सभी वर्गों,

val conf = new SparkConf().setAppName(appName).setMaster(master) 
val ssc = new StreamingContext(conf, Seconds(10)) 

val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest 

dstream.map(println).saveToCassandra(...) 

ssc.start() 
ssc.awaitTermination() 
संबंधित मुद्दे