मैं एक ही मुद्दा था और InputDStream वर्ग का एक उपवर्ग बनाकर एक समाधान पाया:
कुछ इस तरह चाल (प्रारंभिक बिंदु के रूप में अपने कोड का उपयोग कर) करना चाहिए। start()
और compute()
विधियों को परिभाषित करना आवश्यक है।
start()
तैयारी के लिए उपयोग किया जा सकता है। मुख्य तर्क compute()
में रहता है। यह Option[RDD[T]]
वापस आ जाएगा। कक्षा को लचीला बनाने के लिए, InputStreamQuery
विशेषता परिभाषित की गई है।
trait InputStreamQuery[T] {
// where clause condition for partition key
def partitionCond : (String, Any)
// function to return next partition key
def nextValue(v:Any) : Option[Any]
// where clause condition for clustering key
def whereCond : (String, (T) => Any)
// batch size
def batchSize : Int
}
कैसेंड्रा तालिका keyspace.test
के लिए, test_by_date
जो विभाजन कुंजी date
द्वारा तालिका reorganizes पैदा करते हैं। test
तालिका के लिए
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM keyspace.test
WHERE id IS NOT NULL
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY (id ASC);
एक संभावित क्रियान्वयन किया जाएगा
class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
val dateFormat = "uuuu-MM-dd"
// set batch size as 10 records
override def batchSize: Int = 10
// partitioning key conditions, query string and initial value
override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
// clustering key condition, query string and function to get clustering key from the instance
override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
// return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
override def nextValue(v: Any): Option[Any] = {
import java.time.format.DateTimeFormatter
val formatter = DateTimeFormatter.ofPattern(dateFormat)
val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
if (nextDate.isAfter(LocalDate.now())) None
else Some(nextDate.format(formatter))
}
}
यह इस प्रकार के रूप में CassandraInputStream
कक्षा में इस्तेमाल किया जा सकता।
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T])
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
// find a partition key which stores some records
def findStartValue(cql : String, value:Any): Any = {
val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
if (rdd.cassandraCount() > 0) value
else {
nextValue(value).map(findStartValue(cql, _)).getOrElse(value)
}
}
// get query string and initial value from partitionCond method
val (cql, value) = partitionCond
partitionKey = findStartValue(cql, value)
}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val (cql, _) = partitionCond
val (wh, whKey) = whereCond
def fetchNext(patKey: Any) : Option[CassandraTableScanRDD[T]] = {
// query with partitioning condition
val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, patKey)
val rdd = lastElm.map{ x =>
query.where(wh, whKey(x)).withAscOrder.limit(batchSize)
}.getOrElse(query.withAscOrder.limit(batchSize))
if (rdd.cassandraCount() > 0) {
// store the last element of this RDD
lastElm = Some(rdd.collect.last)
Some(rdd)
}
else {
// find the next partition key which stores data
nextValue(patKey).flatMap{ k =>
partitionKey = k
fetchNext(k)}
}
}
fetchNext(partitionKey)
}
}
संयोजन सभी वर्गों,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra(...)
ssc.start()
ssc.awaitTermination()
आप का वर्णन कर सकता है कि आप क्या हासिल करना चाहते हैं? प्रत्येक अंतराल पर पूरी तालिका पढ़ें? स्ट्रीमिंग डेटा कहां से आ रहा है? – maasg
@maasg मैं उस समय से संबंधित कुछ रिकॉर्ड पूछने के लिए प्रत्येक अंतराल (जैसे 10s) पर तालिका को पढ़ना चाहता हूं। इसका मतलब है कि मैं कैसंड्रा स्पार्क स्ट्रीमिंग का स्रोत बनना चाहता हूं। एक शब्द में, मैं डीस्ट्रीम के निर्माण पर अवरुद्ध हूं। क्या आप कुछ सुझाव और उदाहरण देना चाहते हैं? बहुत बहुत धन्यवाद! –