StackOverflow Error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

StackOverflow Error

Vinay Patil
Hi,

I am trying to run a pipeline on Flink 1.8.1 ,getting the following exception:

java.lang.StackOverflowError
at java.lang.Exception.<init>(Exception.java:66)
at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51)
at java.lang.Class.getDeclaredMethod(Class.java:2130)
at org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)

I have even tried running in legacy mode, the pipeline code is :

private void execute(String[] args) {
        ParameterTool pt = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setMaxParallelism(30);
        env.setParallelism(20);

        env.enableCheckpointing(5000);
        StateBackend backend = new FsStateBackend(pt.getRequired("checkpoint_path"), true);
        env.setStateBackend(backend);
       
        FlinkDynamoDBStreamsConsumer<ObjectNode> flinkDynamoDBStreamsConsumer =
                new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new JsonNodeDeserializationSchema(),
                        dynamodbStreamsConsumerConfig);
       
        SingleOutputStreamOperator<ObjectNode> sourceStream = env
                .addSource(flinkDynamoDBStreamsConsumer)
                .name("Dynamo DB Streams");
       
        sourceStream
                .keyBy(new CdcKeySelector())
                .addSink(new FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
                        new JsonSerializationSchema()))
                .name("Kafka Sink");

        try {
            env.execute();
        } catch (Exception e) {
            System.out.println("Caught exception for pipeline" + e.getMessage());
            e.printStackTrace();
        }
    }

Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: StackOverflow Error

Vinay Patil
Hi Ravi,

Tried with both new and legacy mode, it works locally but on cluster I am getting this exception, I am passing jackson ObjectNode class, should be serializable. What do you think?

On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <[hidden email]> wrote:
Hi Vinay,

Please make sure that all your custom code is serializable. You can run this using new mode. 

Thanks, 
Ravi 

On Sat 20 Jul, 2019, 08:13 Vinay Patil, <[hidden email]> wrote:
Hi,

I am trying to run a pipeline on Flink 1.8.1 ,getting the following exception:

java.lang.StackOverflowError
at java.lang.Exception.<init>(Exception.java:66)
at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51)
at java.lang.Class.getDeclaredMethod(Class.java:2130)
at org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)

I have even tried running in legacy mode, the pipeline code is :

private void execute(String[] args) {
        ParameterTool pt = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setMaxParallelism(30);
        env.setParallelism(20);

        env.enableCheckpointing(5000);
        StateBackend backend = new FsStateBackend(pt.getRequired("checkpoint_path"), true);
        env.setStateBackend(backend);
       
        FlinkDynamoDBStreamsConsumer<ObjectNode> flinkDynamoDBStreamsConsumer =
                new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new JsonNodeDeserializationSchema(),
                        dynamodbStreamsConsumerConfig);
       
        SingleOutputStreamOperator<ObjectNode> sourceStream = env
                .addSource(flinkDynamoDBStreamsConsumer)
                .name("Dynamo DB Streams");
       
        sourceStream
                .keyBy(new CdcKeySelector())
                .addSink(new FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
                        new JsonSerializationSchema()))
                .name("Kafka Sink");

        try {
            env.execute();
        } catch (Exception e) {
            System.out.println("Caught exception for pipeline" + e.getMessage());
            e.printStackTrace();
        }
    }

Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: StackOverflow Error

Vinay Patil
Hi Ravi,

The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved this issue. Thanks a lot.

Is there any disadvantage of explicitly setting this ?


Regards,
Vinay Patil


On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vinay,

ObjectNode seems ok as this is being used by flink provided "JsonNodeDeserailizationSchema".  

Please verify that you are using maven dependency "flink-connector-kinesis" 1.8.1 version (with your Flink 1.8.1 cluster) and package this dependency as part of your application uber/fat jar. If you are already doing this way then, please also try to set closure cleaner level to "TOP_LEVEL" like below.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL)

Regards,
Ravi

On Sat, Jul 20, 2019 at 1:53 PM Vinay Patil <[hidden email]> wrote:
Hi Ravi,

Tried with both new and legacy mode, it works locally but on cluster I am getting this exception, I am passing jackson ObjectNode class, should be serializable. What do you think?

On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <[hidden email]> wrote:
Hi Vinay,

Please make sure that all your custom code is serializable. You can run this using new mode. 

Thanks, 
Ravi 

On Sat 20 Jul, 2019, 08:13 Vinay Patil, <[hidden email]> wrote:
Hi,

I am trying to run a pipeline on Flink 1.8.1 ,getting the following exception:

java.lang.StackOverflowError
at java.lang.Exception.<init>(Exception.java:66)
at java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51)
at java.lang.Class.getDeclaredMethod(Class.java:2130)
at org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)

I have even tried running in legacy mode, the pipeline code is :

private void execute(String[] args) {
        ParameterTool pt = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setMaxParallelism(30);
        env.setParallelism(20);

        env.enableCheckpointing(5000);
        StateBackend backend = new FsStateBackend(pt.getRequired("checkpoint_path"), true);
        env.setStateBackend(backend);
       
        FlinkDynamoDBStreamsConsumer<ObjectNode> flinkDynamoDBStreamsConsumer =
                new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new JsonNodeDeserializationSchema(),
                        dynamodbStreamsConsumerConfig);
       
        SingleOutputStreamOperator<ObjectNode> sourceStream = env
                .addSource(flinkDynamoDBStreamsConsumer)
                .name("Dynamo DB Streams");
       
        sourceStream
                .keyBy(new CdcKeySelector())
                .addSink(new FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
                        new JsonSerializationSchema()))
                .name("Kafka Sink");

        try {
            env.execute();
        } catch (Exception e) {
            System.out.println("Caught exception for pipeline" + e.getMessage());
            e.printStackTrace();
        }
    }

Regards,
Vinay Patil