2015-06-15 4 views
6

मैं Flume में HTTPSource उपयोग कर रहा हूँ HDFS में JSON सम्मिलित करने के लिए कैसे:इस प्रकार का उपयोग कर <code>json</code> प्रारूप में <em>पोस्ट</em> घटनाओं प्राप्त करने के लिए सही ढंग से Flume

{"username":"xyz","password":"123"} 

मेरा प्रश्न है: मैं के स्रोत को संशोधित करने के है

[{ 
    "headers" : { 
      "timestamp" : "434324343", 
      "host" : "random_host.example.com" 
      }, 
    "body" : "{"username":"xyz","password":"123"}" 
}] 

यह यह करने के लिए सबसे अच्छा तरीका है: घटनाओं (मैं एक है कि Flume को JSON भेज रहा है मतलब) तो JSON, निम्न स्वरूप है? या मैं इसे हर जगह संशोधित कर सकता हूं?

flume agent के लिए मेरे conf फ़ाइल है:

## Componentes 
SomeAgent.sources = SomeHTTP 
SomeAgent.channels = MemChannel 
SomeAgent.sinks = SomeHDFS 

## Fuente e Interceptores 
SomeAgent.sources.SomeHTTP.type = http 
SomeAgent.sources.SomeHTTP.port = 5140 
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler 
SomeAgent.sources.SomeHTTP.channels = MemChannel 
SomeAgent.sources.SomeHTTP.interceptors = i1 i2 

## Interceptores 
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp 
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host 
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname 

## Canal 
SomeAgent.channels.MemChannel.type = memory 
SomeAgent.channels.MemChannel.capacity = 10000 
SomeAgent.channels.MemChannel.transactionCapacity = 1000 

## Sumidero 
SomeAgent.sinks.SomeHDFS.type = hdfs 
SomeAgent.sinks.SomeHDFS.channel = MemChannel 
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d 
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream 
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs- 
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text 
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100 
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0 
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000 
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600 
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true 

रनिंग cat की hadoop fs

$ hadoop fs -ls -R /raw/logs/somes 
drwxr-xr-x - flume-agent supergroup   0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16 
-rw-r--r-- 3 flume-agent supergroup  3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369 
-rw-r--r-- 3 flume-agent supergroup  3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774 


$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head 




$ 

(आप सही तरीके से लग रहे हैं, खाली लाइनें)

तो अब मैं फ़ाइल को देखो (उदाहरण के लिए HUE के बाइनरी व्यू का उपयोग करके):

0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 

उत्तर

4

यदि मैं अच्छी तरह से समझ गया हूं, तो आप डेटा और शीर्षलेख दोनों को क्रमबद्ध करना चाहते हैं। उस स्थिति में, आपको डेटा स्रोत को संशोधित करने की आवश्यकता नहीं है, लेकिन कुछ मानक फ़्ल्यूम तत्वों का उपयोग करें और एचडीएफएस के लिए अपना कस्टम सीरियलाइज़र बनाएं।

फ़्लूम प्राप्त करने के लिए पहला कदम वांछित JSON संरचना, यानी हेडर + बॉडी बनाता है। Flume, यह तुम्हारे लिए क्या सिर्फ अपने HTTPSource पर इस तरह से JSONHandler उपयोग करते हैं, करने में सक्षम है:

a1.sources = r1 
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler 

वास्तव में, यह JSON हैंडलर कॉन्फ़िगर करने के लिए है, क्योंकि यह HTTPSource के लिए डिफ़ॉल्ट से एक है आवश्यक नहीं है।

फिर वांछित शीर्षलेख जोड़ने के लिए Timestamp Interceptor और Host Interceptor दोनों का उपयोग करें। केवल चाल Flume एजेंट क्रम में इस प्रक्रिया की तुलना में एक ही मशीन में चलाना चाहिए है रोक मेजबान इस एक से एक ही है:

a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = timestamp 
a1.sources.r1.interceptors.i2.type = host 
a1.sources.r1.interceptors.i2.hostHeader = hostname 

इस बिंदु पर, आप इच्छित घटना होगा। फिर भी, एचडीएफएस के लिए मानक धारावाहिक केवल शरीर को बचाते हैं, हेडर नहीं। इस प्रकार एक कस्टम धारावाहिक बनाएं जो org.apache.flume.serialization.EventSerializer लागू करता है। यह रूप में कॉन्फ़िगर किया गया है:

a1.sinks = k1 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.serializer = my_custom_serializer 

HTH

+0

मुझे यह लिंक मिला है (http://grokbase.com/t/flume/user/128nspvnfg/can-hdfssink-write-headers-as-well) कह रहा है कि HDFSSink के लिए cuatom serializers केवल ' फ़ाइल टाइप = संपीड़ित स्ट्रीम 'या' डेटास्ट्रीम '। मुझे नहीं पता कि यह वर्तमान में 'अनुक्रमफाइल' के लिए तय है या नहीं। – frb

+0

आपकी प्रतिक्रिया के लिए धन्यवाद धन्यवाद, मैंने अभी कॉन्फ़िगरेशन फ़ाइल चिपकाई है, लेकिन जब मैं देखता हूं ('हैडोप एफएस-कैट/कच्चे/लॉग/2015-06-15/कुछलॉग -1414410388430' का उपयोग करके) मुझे कुछ भी नहीं दिखाई देता है (ए बहुत सी खाली गुच्छा लाइनें, जो मुझे संदेह है बाइनरी में हैं) क्या आप त्रुटि देख सकते हैं? – nanounanue

+0

मैंने इस सवाल में बाइनरी के रूप में आउटपुट जोड़ा ... कुछ भी लॉग इन नहीं कर रहा है: (' – nanounanue

3

जवाब @frb द्वारा पोस्ट की सही था, केवल बात को अनदेखा कर रहा है कि JSON जनरेटर body हिस्सा भेजना होगा (मैं मानता चाहिए/शिकायत करते हैं कि docs, सहीjson पोस्टिंग के रास्ते

[body:"{'username':'xyz','password':'123'}"] 

नहीं है कृपया उस बिंदु में स्पष्ट) नहीं हैं, इसलिए ते कि डेटा के json अब एक स्ट्रिंग है।

इस परिवर्तन के साथ, json अब hdfs में दिखाई दे रहा है।

+0

अच्छा बिंदु! क्या आपको अंत में संशोधित करना पड़ा JSON को स्ट्रिंग के रूप में भेजने के लिए इवेंट स्रोत? या आपने प्राप्त डेटा को स्ट्रिंग करने के लिए JSONHandler के किसी भी तरीके से संशोधित किया? – frb

+0

मैंने स्रोत को संशोधित किया। परिवर्तन जेसन को स्ट्रिंग कर रहा था और कुंजी 'बॉडी' – nanounanue

1

Flume HTTPSource डिफ़ॉल्ट JSONHandler JSON प्रतिनिधित्व [{ headers: ..., body: ... }] समाप्ति बिंदु को प्रस्तुत किया जाना में पूरी तरह से गठन Flume घटनाओं की एक सूची उम्मीद उपयोग करते हुए; एक एजेंट एंडपॉइंट बनाने के लिए जो {"username":"xyz", "password":"123"} जैसी एक नंगे एप्लिकेशन-स्तरीय संरचना को स्वीकार कर सकता है, आप हैंडलर को वैकल्पिक कक्षा के साथ ओवरराइड कर सकते हैं जो HTTPSourceHandler लागू करता है; JSONHandler स्रोत देखें - इसमें बहुत कुछ नहीं है।

public List<Event> getEvents(HttpServletRequest request) throws ... 

एक कस्टम JSONHandler आप भी इस तरह के स्रोत आईपी, उपयोगकर्ता-एजेंट आदि के रूप में घटना HTTP अनुरोध के आधार पर, के लिए हेडर जोड़ सकता है में (एक इंटरसेप्टर इस के लिए संदर्भ के लिए नहीं होगा)। आप इस बिंदु पर एप्लिकेशन द्वारा आपूर्ति किए गए JSON को मान्य करना चाहते हैं (हालांकि डिफ़ॉल्ट हैंडलर नहीं करता है)।

हालांकि जैसा कि आप मिल गया है, तो आप सिर्फ [{body: ...}] हिस्सा पारित कर सकते हैं, इस तरह के एक कस्टम हैंडलर भी उपयोगी यदि आप करना चाहते हो सकता है एक जनरेटर घटना के लिए हेडर इंजेक्शन लगाने को रोकने के।

संबंधित मुद्दे