Wikiedit QuickStart with Kinesis

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Wikiedit QuickStart with Kinesis

Foster, Craig

Hi:

I am using the following WikiEdit example:

https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

 

It works when printing the contents to a file or stdout.

 

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the Kafka part, I put:

 

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(
new MapFunction<Tuple2<String,Long>, String>() {
   
@Override
   
public String map(Tuple2<String, Long> tuple) {
       
return tuple.toString();
   
}
})
.addSink(kinesis)
;

see.execute();

 

 

But I get the following error:

2016-08-31 17:05:41,541 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to CANCELING
2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 43a13707d92da260827f37968597c187 () changed to FAILING.
java.lang.Exception: Serialized representation of org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Searching Google doesn't yield many things that seem to work. Is there somewhere I should look for a root cause? I looked in the full log file but it's not much more than this stacktrace.

Reply | Threaded
Open this post in threaded view
|

Re: Wikiedit QuickStart with Kinesis

Tzu-Li (Gordon) Tai
Hi Craig,

I’ve just run a simple test on this and there should be no problem.

What Flink version were you using (the archetype version used with the Flink Quickstart Maven Archetype)?
Also, on which branch / commit was the Kinesis connector built? Seeing that you’ve used the “AUTO”
credentials provider option, I’m assuming it’s built on the master branch and not a release branch (the “AUTO”
option wasn’t included in any of the release branches yet).

So I’m suspecting it’s due to a version conflict between the two. If yes, you should build the Kinesis connector
with the same release version branch as the Flink version you’re using.
Could you check and see if the problem remains? Thanks!

Regards,
Gordon


On September 1, 2016 at 1:34:19 AM, Foster, Craig ([hidden email]) wrote:

Hi:

I am using the following WikiEdit example:

https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

 

It works when printing the contents to a file or stdout.

 

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the Kafka part, I put:

 

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(
new MapFunction<Tuple2<String,Long>, String>() {
   
@Override
   
public String map(Tuple2<String, Long> tuple) {
       
return tuple.toString();
   
}
})
.addSink(kinesis)
;

see.execute();

 

 

But I get the following error:

2016-08-31 17:05:41,541 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to CANCELING
2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 43a13707d92da260827f37968597c187 () changed to FAILING.
java.lang.Exception: Serialized representation of org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Searching Google doesn't yield many things that seem to work. Is there somewhere I should look for a root cause? I looked in the full log file but it's not much more than this stacktrace.

Reply | Threaded
Open this post in threaded view
|

Re: Wikiedit QuickStart with Kinesis

Foster, Craig

Thanks Gordon. I think I changed all my versions to match the version to which I built Kinesis connector, so you were right. That seems to have moved me further. I can write to streams now. Now all I need to do is figure out how Kinesis is encoding it. :)

 

One issue with the "AUTO" option is that whatever credentials it finds, it doesn't seem to have PutRecords permissions even though the AWS IAM role I am using ostensibly has that...so I am back to having credentials in code which isn't necessarily a best practice. I haven't figured that part out yet either.

 

From: "Tzu-Li (Gordon) Tai" <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, September 1, 2016 at 2:25 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: Wikiedit QuickStart with Kinesis

 

Hi Craig,

 

I’ve just run a simple test on this and there should be no problem.

 

What Flink version were you using (the archetype version used with the Flink Quickstart Maven Archetype)?

Also, on which branch / commit was the Kinesis connector built? Seeing that you’ve used the “AUTO”

credentials provider option, I’m assuming it’s built on the master branch and not a release branch (the “AUTO”

option wasn’t included in any of the release branches yet).

 

So I’m suspecting it’s due to a version conflict between the two. If yes, you should build the Kinesis connector

with the same release version branch as the Flink version you’re using.

Could you check and see if the problem remains? Thanks!

 

Regards,

Gordon

 

 

On September 1, 2016 at 1:34:19 AM, Foster, Craig ([hidden email]) wrote:

Hi:

I am using the following WikiEdit example:

https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

 

It works when printing the contents to a file or stdout.

 

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the Kafka part, I put:

 

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(
new MapFunction<Tuple2<String,Long>, String>() {
   
@Override
   
public String map(Tuple2<String, Long> tuple) {
       
return tuple.toString();
   
}
})
.addSink(kinesis)
;

see.execute();

 

 

But I get the following error:

2016-08-31 17:05:41,541 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to CANCELING
2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 43a13707d92da260827f37968597c187 () changed to FAILING.
java.lang.Exception: Serialized representation of org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Searching Google doesn't yield many things that seem to work. Is there somewhere I should look for a root cause? I looked in the full log file but it's not much more than this stacktrace.

Reply | Threaded
Open this post in threaded view
|

Re: Wikiedit QuickStart with Kinesis

Tzu-Li Tai
I’m afraid the “AUTO” option on the Kinesis producer is actually bugged, so the internally used KPL library correctly pick up credentials with the default credential provider chain. I’ve just filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-4559.

Regarding the data encoding:
So, you’re using a SimpleStringSchema for serialization of the tuple strings. The SimpleStringSchema simply calls String.getBytes() to serialize the string into a byte array, so the data is encoded with the platform’s default charset.
Internally, the SimpleStringSchema is wrapped within a KinesisSerializationSchema, which ultimately wraps the bytes within
a ByteBuffer that is added to the internal KPL for writing to the streams. You can actually also choose to directly use a KinesisSerializationSchema to invoke the FlinkKinesisProducer.

Also note that since FlinkKinesisProducer uses KPL, there might be some "magic bytes" in the encoded data added by KPL to aggregate multiple records when writing to streams. If you’re using the AWS KCL (Kinesis Client Library) or the FlinkKinesisConsumer to read the data, there shouldn’t be a problem decoding them (the FlinkKinesisConsumer uses a class from KCL to help decode aggregated records sent by KPL).

Let me know if you bump into any other problems ;)

Regards,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: Wikiedit QuickStart with Kinesis

Foster, Craig
Oh, in that case, maybe I should look into using the KCL. I'm just using boto and boto3 which are definitely having different problems but both related to the encoding.
 
boto3 prints *something*:
 
(.96.129.59,-20)'(01:541:4305:C70:10B4:FA8C:3CF9:B9B0,0(Patrick Barlane,0(Nedrutland,12(GreenC bot,15(Bamyers99,-170(Mean as custard,661)ж?U????¨p?&"1w?
??
(RaphaelQS,-3(.44.211.32,50(JMichael22,1298)
                                           (Wbm1058,-9)Y?Z????/r(???
 
But boto just gives an exception:
 
<type 'exceptions.UnicodeDecodeError'>: 'utf8' codec can't decode bytes in position 0-2: invalid continuation byte
 
It does this even when getting a response object.
 
Thanks for your help! I'll try with the KCL before changing my SerializationSchema just yet.


On 9/1/16, 10:37 AM, "Tzu-Li Tai" <[hidden email]> wrote:

    I’m afraid the “AUTO” option on the Kinesis producer is actually bugged, so
    the internally used KPL library correctly pick up credentials with the
    default credential provider chain. I’ve just filed a JIRA for this:
    https://issues.apache.org/jira/browse/FLINK-4559
    <https://issues.apache.org/jira/browse/FLINK-4559>  .
   
    Regarding the data encoding:
    So, you’re using a SimpleStringSchema for serialization of the tuple
    strings. The SimpleStringSchema simply calls String.getBytes() to serialize
    the string into a byte array, so the data is encoded with the platform’s
    default charset.
    Internally, the SimpleStringSchema is wrapped within a
    KinesisSerializationSchema, which ultimately wraps the bytes within
    a ByteBuffer that is added to the internal KPL for writing to the streams.
    You can actually also choose to directly use a KinesisSerializationSchema to
    invoke the FlinkKinesisProducer.
   
    Also note that since FlinkKinesisProducer uses KPL, there might be some
    "magic bytes" in the encoded data added by KPL to aggregate multiple records
    when writing to streams. If you’re using the AWS KCL (Kinesis Client
    Library) or the FlinkKinesisConsumer to read the data, there shouldn’t be a
    problem decoding them (the FlinkKinesisConsumer uses a class from KCL to
    help decode aggregated records sent by KPL).
   
    Let me know if you bump into any other problems ;)
   
    Regards,
    Gordon
   
   
   
    --
    View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Wikiedit-QuickStart-with-Kinesis-tp8819p8840.html
    Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.