9

मैं डायनेमोडीबी धाराओं का उपयोग करके डायनेमोडीबी तालिका परिवर्तनों को कैप्चर करने का प्रयास कर रहा हूं और एडब्ल्यूएस प्रदान किया गया है जो जावा डायनेमोडीबी धाराओं Kinesis एडाप्टर प्रदान करता है। मैं एक स्कैला ऐप में एडब्ल्यूएस जावा एसडीके के साथ काम कर रहा हूं।एडब्लूएस जावा डायनेमो डीबी धाराओं का उपयोग कर प्रोटीन डाइनेमो डीबी धाराएं Kinesis एडाप्टर

मैंने AWS guide का पालन करके और code example प्रकाशित एडब्लूएस के माध्यम से जाकर शुरू किया। हालांकि मुझे अपने पर्यावरण में काम कर रहे अमेज़ॅन का अपना प्रकाशित कोड प्राप्त करने में समस्याएं आ रही हैं। मेरा मुद्दा KinesisClientLibConfiguration ऑब्जेक्ट के साथ है।

उदाहरण कोड में, KinesisClientLibConfiguration डायनेमो डीबी द्वारा प्रदान की गई धारा एआरएन के साथ कॉन्फ़िगर किया गया है।

lazy val streamArn = dynamoClient.describeTable(config.tableName) 
.getTable.getLatestStreamArn 

और फिर प्रदान की ARN साथ KinesisClientLibConfiguration बनाने:

new KinesisClientLibConfiguration("streams-adapter-demo", 
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker") 

मैं पहली बार मेरी डायनमो मेज से वर्तमान ARN लगाने के आधार पर अपने स्काला अनुप्रयोग में एक समान तरीके का अनुसरण किया

lazy val kinesisConfig :KinesisClientLibConfiguration = 
new KinesisClientLibConfiguration(
    "testProcess", 
    streamArn, 
    defaultProviderChain, 
    "testWorker" 
).withMaxRecords(1000) 
    .withRegionName("eu-west-1") 
    .withMetricsLevel(MetricsLevel.NONE) 
    .withIdleTimeBetweenReadsInMillis(500) 
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

मैंने प्रदत्त स्ट्रीम एआरएन सत्यापित किया है और सब कुछ जो मैं एडब्ल्यूएस कंसोल में देखता हूं उससे मेल खाता है।

क्रम मैं अंत में एक अपवाद बताते हुए कि प्रदान की ARN मान्य स्ट्रीम नाम नहीं है हो रही है पर:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call 
SEVERE: Caught exception while sync'ing Kinesis shards and leases 
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation  
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName' failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID:) 

KinesisClientLibConfiguration इस पर उपलब्ध कराई गई मतलब है प्रलेखन को देखते हुए के रूप में दूसरा पैरामीटर रूप में सूचीबद्ध है एआरएन के किसी भी उल्लेख के बिना स्ट्रीमनाम

मुझे एआरएन से संबंधित KinesisClientLibConfiguration पर कुछ भी नहीं मिल रहा है। चूंकि मैं डायनेमोडीबी स्ट्रीम के साथ काम कर रहा हूं और एक किनेस स्ट्रीम नहीं हूं, इसलिए मुझे यह भी पता नहीं है कि मेरा स्ट्रीम नाम कैसे ढूंढें।

इस बिंदु पर मुझे यकीन है कि प्रकाशित एडब्ल्यूएस उदाहरण से मुझे क्या याद आ रही है, ऐसा लगता है कि वे केसीएल के पुराने संस्करण का उपयोग कर रहे हैं। मैं अमेज़ॅन-किनेसिस-क्लाइंट के संस्करण 1.7.0 का उपयोग कर रहा हूं।

उत्तर

3

समस्या वास्तव में मेरे KinesisClientLibConfiguration के बाहर होने के समाप्त हो गई।

मैं उसी कॉन्फ़िगरेशन का उपयोग करके और डायनेमोडीबी स्ट्रीम एडाप्टर लाइब्रेरी और डायनेमोडीबी और क्लाउडवॉच दोनों के लिए क्लाइंट एडाप्टर दोनों प्रदान करके इस समस्या को हल करने में सक्षम था।

मेरा कामकाजी समाधान अब ऐसा दिखता है।

Kinesis क्लाइंट कॉन्फ़िगरेशन को परिभाषित करना।

//Kinesis config for DynamoDB streams 
lazy val kinesisConfig :KinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(
     getClass.getName, //DynamoDB shard lease table name 
     streamArn, //pulled from the dynamo table at runtime 
     dynamoCredentials, //DefaultAWSCredentialsProviderChain 
     KeywordTrackingActor.NAME //Lease owner name 
    ).withMaxRecords(1000) //using AWS recommended value 
    .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value 
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

एक धारा एडाप्टर और एक CloudWatch ग्राहक

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials) 
streamAdapterClient.setRegion(region) 

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials) 
cloudWatchClient.setRegion(region) 

को परिभाषित RecordProcessorFactory का एक उदाहरण बना है, यह एक वर्ग को लागू करता है कि KCL IRecordProcessorFactory और लौट आए IRecordProcessor प्रदान की परिभाषित करने के लिए आप पर निर्भर है।

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName) 

और जिस भाग में मैं लापता था, यह सब आपके कार्यकर्ता को प्रदान करने की आवश्यकता है।

val worker :Worker = 
    new Worker.Builder() 
    .recordProcessorFactory(recordProcessorFactory) 
    .config(kinesisConfig) 
    .kinesisClient(streamAdapterClient) 
    .dynamoDBClient(dynamoClient) 
    .cloudWatchClient(cloudWatchClient) 
    .build() 

//this will start record processing 
streamExecutorService.submit(worker) 
0

वैकल्पिक रूप से, आप com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker जो आंतरिक रूप से AmazonDynamoDBStreamsAdapterClient का उपयोग करता है के बजाय com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker उपयोग कर सकते हैं।

अर्थात

lazy val kinesisConfig :KinesisClientLibConfiguration = 
new KinesisClientLibConfiguration(
    getClass.getName, //DynamoDB shard lease table name 
    streamArn, //pulled from the dynamo table at runtime 
    dynamoCredentials, //DefaultAWSCredentialsProviderChain 
    KeywordTrackingActor.NAME //Lease owner name 
).withMaxRecords(1000) //using AWS recommended value 
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value 
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig) 
0

बस जवाब देने के लिए क्या समस्या थी - आप ARN उपलब्ध कराने थे जब यह सिर्फ स्ट्रीम का नाम चाहता था।

0

मैंने हाल ही में इस परियोजना gfc-aws-kinesis पर एक पीआर किया था और अब आप एडाप्टर को पार करके और KinesisRecordAdapter कार्यान्वयन लिखकर इसका उपयोग कर सकते हैं।

उदाहरण मैं Scanamo उपयोग कर रहा हूँ hashmap

विन्यास में ग्राहक

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient = 
    new AmazonDynamoDBStreamsAdapterClient() 

पास यह बनाएं पार्स करने के लिए:

val streamConfig = KinesisStreamConsumerConfig[Option[A]](
    applicationName, 
    config.stream, //the full dynamodb stream arn 
    regionName = Some(config.region), 
    checkPointInterval = config.checkpointInterval, 
    initialPositionInStream = config.streamPosition, 
    dynamoDBKinesisAdapterClient = Some(streamAdapterClient) 
) 
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed) 

के लिए उपयुक्त एक अंतर्निहित रिकॉर्ड पाठक बनाएं डायनामोडब घटनाओं को पढ़ना:

implicit val kinesisRecordReader 
    : KinesisRecordReader[Option[A]] = 
    new KinesisRecordReader[Option[A]] { 
    override def apply(record: Record): Option[A] = { 
     record match { 
     case recordAdapter: RecordAdapter => 
      val dynamoRecord: DynamoRecord = 
      recordAdapter.getInternalObject 
      dynamoRecord.getEventName match { 
      case "INSERT" => 
       ScanamoFree 
       .read[A](
        dynamoRecord.getDynamodb.getNewImage) 
       .toOption 
      case _ => None 
      } 
     case _ => None 
     } 
    } 
    } 
+0

आपको एक उदाहरण और एक संक्षिप्त स्पष्टीकरण जोड़कर अपना जवाब सुधारना चाहिए। शायद [यह] पढ़ना (https://stackoverflow.com/help/how-to-answer) आपको अपना उत्तर सुधारने में मदद करता है। – Markus

संबंधित मुद्दे