Hi, there,
I am using avro format and write data to S3, recently upgraded from Flink 1.3.2 to 1.5 and noticed the following errors as below: I am using RocksDB and checkpointDataUri is an S3 location. My writer looks like something below. val writer = new AvroKeyValueSinkWriter[String, R](properties).duplicate() 2018-07-24 17:50:44,012 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (4/4) (28f918a31d273e176409de3d4cb46c3c) switched from RUNNING to FAILED. java.lang.IllegalStateException: Writer has already been opened at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69) at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 2018-07-24 17:50:44,015 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 28 of job cc73a9db44814dc3d5a5ce497c8b0389 because: Writer has already been opened 2018-07-24 17:50:44,016 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Enrollment Log Member and Chapter (cc73a9db44814dc3d5a5ce497c8b0389) switched from state RUNNING to FAILING. java.lang.IllegalStateException: Writer has already been opened at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69) at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Any help would be greatly appreciated. Thanks! Regards, Chengzhi |
Free forum by Nabble | Edit this page |