2015-11-16 4 views
7

कॉन्फ़िगर करें यह मेरा पहला समय है, इसलिए क्षमा करें अगर मैं ठीक पोस्ट नहीं करता हूं, और मेरी खराब अंग्रेजी के लिए खेद है।सिंक elasticsearch apache-flume

मैं अपाचे फ्ल्यूम और लोचदार खोज सिंक को कॉन्फ़िगर करने की कोशिश कर रहा हूं। सबकुछ ठीक है, ऐसा लगता है कि यह ठीक चलता है, लेकिन जब मैं एजेंट शुरू करता हूं तो 2 चेतावनियां होती हैं; निम्नलिखित लोगों:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } } - Exception follows. 
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143) 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77) 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48) 
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357) 
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) 
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79) 
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies 

मेरे एजेंट विन्यास:

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink ES 
a1.sinks = k1 
a1.sinks.k1.type = elasticsearch 
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 
a1.sinks.k1.indexName = items 
a1.sinks.k1.indexType = item 
a1.sinks.k1.clusterName = elasticsearch 
a1.sinks.k1.batchSize = 500 
a1.sinks.k1.ttl = 5d 
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer 
a1.sinks.k1.channel = c1 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

यह netcat शुरू होता है और सब ठीक है, लेकिन मैं शोध करे चेतावनियों के बारे में डर लग रहा है, मैं इसे नहीं समझते।

+0

क्या आप वाकई दिए गए कॉन्फ़िगरेशन को ठीक से चलाते हैं? पहले लॉग ट्रेस एक चेतावनी नहीं है, लेकिन एक त्रुटि कह 'ElasticSearchSink' कुछ समस्या है, सबसे शायद कुछ निर्भरता समस्या से संबंधित है (वहाँ एक विधि है कि नहीं मिला है)। – frb

+0

मैं विशेष संदेश को चेतावनी देने का पता लगाने के द्वारा दिए गए के बारे में नोटिस नहीं किया, लेकिन यह मेरी निदान की पुष्टि करता है: 'घटक SinkRunner: {नीति: [email protected] counterGroup: {नाम: अशक्त काउंटर: {} }}, बंद कर दिया, क्योंकि यह besuccessfully कारण लापता dependencies' – frb

उत्तर

1

लॉग में भाग लेते हुए, कुछ लापता निर्भरता के साथ एक समस्या है।

आप ElasticSearchSink प्रलेखन के लिए एक नजर है, तो आप निम्नलिखित दिखाई देगा:

elasticsearch और Lucene कोर अपने परिवेश के लिए आवश्यक जार अपाचे Flume की lib निर्देशिका में रखा जाना चाहिए स्थापना। Elasticsearch की आवश्यकता है कि क्लाइंट JAR का प्रमुख संस्करण सर्वर से मेल खाता है और दोनों JVM के समान मामूली संस्करण चला रहे हैं। यदि यह गलत है तो SerializationExceptions दिखाई देंगे। आवश्यक संस्करण का चयन करने के लिए पहले elasticsearch का संस्करण निर्धारित करें और JVM संस्करण लक्ष्य क्लस्टर चल रहा है। फिर एक elasticsearch क्लाइंट लाइब्रेरी का चयन करें जो प्रमुख संस्करण से मेल खाता है। एक 0.1 9.एक्स क्लाइंट 0.1 9.एक्स क्लस्टर से बात कर सकता है; 0.20.x 0.20.x और 0.90.x से बात कर सकते हैं 0.90.x से बात कर सकते हैं। एक बार elasticsearch संस्करण निर्धारित किया गया है, तो उपयोग करने के लिए सही ल्यूसीन कोर जेएआर संस्करण निर्धारित करने के लिए pom.xml फ़ाइल पढ़ें। लोचदार एजेंट जो ElasticSearchSink चला रहा है उसे भी JVM से मेल खाना चाहिए लक्ष्य समूह छोटे संस्करण में चल रहा है।

शायद आपने आवश्यक जावा जार नहीं रखा है, या संस्करण उचित नहीं है।

2

मुझे एक कारण मिला, ऐसा लगता है कि अपाचे फ्ल्यूम 1.6.0 और लोचदार खोज 2.0 सही संवाद नहीं कर सकता।

मुझे संशोधित एक तीसरे व्यक्ति से एक अच्छा सिंक मिला।

Here is the link

और यह मेरा अंतिम विन्यास है,

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink ES 
a1.sinks = k1 
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink 
a1.sinks.k1.hostNames = 127.0.0.1:9300 
a1.sinks.k1.indexName = items 
a1.sinks.k1.indexType = item 
a1.sinks.k1.clusterName = elasticsearch 
a1.sinks.k1.batchSize = 500 
a1.sinks.k1.ttl = 5d 
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer 
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder 
a1.sinks.k1.channel = c1 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

यह मेरे लिए काम करता है।

उत्तर के लिए धन्यवाद।

पीएस हाँ, मुझे पुस्तकालयों को स्थानांतरित करना पड़ा।

1

केवल Flume/lib dir में 2 जार नीचे गया और यह काम किया, अन्य सभी Lucene जार जोड़ने की जरूरत नहीं है:

elasticsearch-1.7.1.jar

Lucene-core- 4.10.4।जार

आदेश Flume शुरू करने के लिए:

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console 

ES में डेटा लोड करने के flume-env.sh

export JAVA_HOME=/usr/java/default 

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" 

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib" 

Flume एग्रीगेटर config करने के लिए नीचे जोड़ना सुनिश्चित करें: Flume-aggregator.conf

agent2.sources = source1 
agent2.sinks = sink1 
agent2.channels = channel1 

################################################ 
# Describe Source 
################################################ 

# Source Avro 
agent2.sources.source1.type = avro 
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997 

################################################ 
# Describe Interceptors 
################################################ 
# an example of nginx access log regex match 
# agent2.sources.source1.interceptors = interceptor1 
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor 
# 
# agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)(\"(.*?)\" \"(.*?)\")?" 
# 
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+) 
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 
# 
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip 
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime 
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method 
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request 
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response 
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status 
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes 
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime 
# 

################################################ 
# Describe Sink 
################################################ 

# Sink ElasticSearch 
# Elasticsearch lib ---> flume/lib 
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data. 
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink 
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300 
agent2.sinks.sink1.indexName = pdi 
agent2.sinks.sink1.indexType = pdi_metrics 
agent2.sinks.sink1.clusterName = My-ES-CLUSTER 
agent2.sinks.sink1.batchSize = 1000 
agent2.sinks.sink1.ttl = 2 
#this serializer is crucial in order to use kibana 
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer 



################################################ 
# Describe Channel 
################################################ 

# Channel Memory 
agent2.channels.channel1.type = memory 
agent2.channels.channel1.capacity = 10000000 
agent2.channels.channel1.transactionCapacity = 1000 

################################################ 
# Bind the source and sink to the channel 
################################################ 

agent2.sources.source1.channels = channel1 
agent2.sinks.sink1.channel = channel1 
+0

सटीक जार लिस्टिंग के लिए धन्यवाद करने के लिए शुरू नहीं कर सका। – user99999991