Hi,
I am trying to run a pipeline on Flink 1.8.1 ,getting the following exception: java.lang.StackOverflowError at java.lang.Exception.<init>(Exception.java:66) at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56) at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at java.lang.Class.getDeclaredMethod(Class.java:2130) at org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) I have even tried running in legacy mode, the pipeline code is : private void execute(String[] args) { ParameterTool pt = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setMaxParallelism(30); env.setParallelism(20); env.enableCheckpointing(5000); StateBackend backend = new FsStateBackend(pt.getRequired("checkpoint_path"), true); env.setStateBackend(backend); FlinkDynamoDBStreamsConsumer<ObjectNode> flinkDynamoDBStreamsConsumer = new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new JsonNodeDeserializationSchema(), dynamodbStreamsConsumerConfig); SingleOutputStreamOperator<ObjectNode> sourceStream = env .addSource(flinkDynamoDBStreamsConsumer) .name("Dynamo DB Streams"); sourceStream .keyBy(new CdcKeySelector()) .addSink(new FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams", new JsonSerializationSchema())) .name("Kafka Sink"); try { env.execute(); } catch (Exception e) { System.out.println("Caught exception for pipeline" + e.getMessage()); e.printStackTrace(); } } Regards, Vinay Patil |
Hi Ravi, Tried with both new and legacy mode, it works locally but on cluster I am getting this exception, I am passing jackson ObjectNode class, should be serializable. What do you think? On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <[hidden email]> wrote:
|
Hi Ravi, The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved this issue. Thanks a lot. Is there any disadvantage of explicitly setting this ? Regards, Vinay Patil On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |