NPE while writing to s3://

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

NPE while writing to s3://

Sathi Chowdhury

I get the NPE from  the below code

I am running this from my mac in a local flink cluster.

 

 

RollingSink<String> s3Sink = new RollingSink<String>("s3://sc-sink1/");
s3Sink.setBucketer(
new DateTimeBucketer("yyyy-MM-dd--HHmm"));
s3Sink.setWriter(
new StringWriter<String>());
s3Sink.setBatchSize(
200);
s3Sink.setPendingPrefix(
"file-");
s3Sink.setPendingSuffix(
".txt");
outStream.addSink(s3Sink).setParallelism(1);

causes

 

java.lang.NullPointerException

            at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:463)

            at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:410)

            at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)

            at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)

            at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)

            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)

            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)

            at java.lang.Thread.run(Thread.java:745)

 

I even tried changing s3:// path to a local path, same issue

The below works and spits out the stream, and the stream has data.

outStream.writeAsText("/Users/schowdhury/flink/kinesisread"+System.currentTimeMillis());
 
am I missing something obvious?looks like it is trying to create a folder.
 

 

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: NPE while writing to s3://

Till Rohrmann

Hi Sathi,

which version of Flink are you using? Since Flink 1.2 the RollingSink is deprecated. It is now recommend to use the BucketingSink. Maybe this problem is resolved with the newer sink.

Cheers,
Till


On Thu, Mar 2, 2017 at 9:44 AM, Sathi Chowdhury <[hidden email]> wrote:

I get the NPE from  the below code

I am running this from my mac in a local flink cluster.

 

 

RollingSink<String> s3Sink = new RollingSink<String>("s3://sc-sink1/");
s3Sink.setBucketer(
new DateTimeBucketer("yyyy-MM-dd--HHmm"));
s3Sink.setWriter(
new StringWriter<String>());
s3Sink.setBatchSize(
200);
s3Sink.setPendingPrefix(
"file-");
s3Sink.setPendingSuffix(
".txt");
outStream.addSink(s3Sink).setParallelism(1);

causes

 

java.lang.NullPointerException

            at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:463)

            at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:410)

            at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)

            at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)

            at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)

            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)

            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)

            at java.lang.Thread.run(Thread.java:745)

 

I even tried changing s3:// path to a local path, same issue

The below works and spits out the stream, and the stream has data.

outStream.writeAsText("/Users/schowdhury/flink/kinesisread"+System.currentTimeMillis());
 
am I missing something obvious?looks like it is trying to create a folder.
 

 

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============