मेरे पास स्कैला में एक स्पार्क एप्लिकेशन है जो हर 10 सेकंड में काफ्का से रिकॉर्ड प्राप्त करता है और उन्हें फाइल के रूप में सहेजता है। यह एसबीटी प्रोजेक्ट है और मैं अपना ऐप sbt run
कमांड के साथ चलाता हूं। जब तक मैं टॉमकैट पर अपना ऐप नहीं लगाता तब तक सब ठीक काम करता है। मैं this plugin के साथ WAR फ़ाइल उत्पन्न करने में कामयाब रहा लेकिन ऐसा लगता है कि मेरा ऐप टॉमकैट पर तैनात होने पर कुछ भी नहीं करता है। अजीब बात यह है कि एप्लिकेशन पूरी तरह से ठीक है जब sbt run
आदेश के माध्यम से भाग गया काम करता हैस्पार्क स्ट्रीमिंग एप्लिकेशन एसबीटी रन का उपयोग करके ठीक काम क्यों करता है लेकिन टॉमकैट (वेब एप्लिकेशन के रूप में) पर नहीं है?
object SparkConsumer {
def main (args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value)).print
val arr = new ArrayBuffer[String]();
val lines = stream.map(record => (record.key, record.value));
stream.foreachRDD { rdd =>
if (rdd.count() > 0) {
val date = System.currentTimeMillis()
rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString)
rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) }
}
println("Stream had " + rdd.count() + " messages")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
println(o)
}
}
stream.saveAsTextFiles("/tmp/output")
ssc.start()
ssc.awaitTermination()
}
}
:
यह मेरा कोड है। यह कफका से रिकॉर्ड्स को ठीक से पढ़ता है और वांछित निर्देशिका में फ़ाइलों के रूप में सहेजता है। मुझे नहीं पता कि क्या हो रहा है। मैंने log4j
के साथ लॉगिंग सक्षम करने का प्रयास किया लेकिन यह टॉमकैट पर कुछ भी लॉग भी नहीं करता है। मैं एक उत्तर की तलाश में हूं लेकिन समाधान नहीं मिला है।
मेरे स्काला स्पार्क एप्लिकेशन (जो एसबीटी परियोजना है) काफ्का से रिकॉर्ड पढ़ सकते हैं और उन्हें बचाने के हर 10 सेकंड फ़ाइलों के रूप में करना चाहिए सारांश में। यह काम करता है जब sbt run
कमांड के माध्यम से चलाया जाता है लेकिन यह टॉमकैट पर तैनात नहीं होता है।
अतिरिक्त जानकारी:
- स्काला 2.12
- बिलाव 7
- एसबीटी 0.13.15
- अधिक
क्यू के लिए पूछना: क्या है मुसीबत?
मैं अनुमान लगाता हूं कि क्लासपाथ में पुस्तकालय गायब हो सकते हैं। क्या आपने रिमोट एप्लिकेशन डिबगिंग किया है? – Serhiy