flink no class found error

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

flink no class found error

Janardhan Reddy
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.




Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

rmetzger0
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.





Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

Janardhan Reddy
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <[hidden email]> wrote:
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.






Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

rmetzger0
Can you check if the jar you are submitting to the cluster contains a different Guava than you use at compile time?

Also, it might happen that Guava is in your classpath, for example one some YARN setups.

The last resort to resolve these issues is to use the maven-shade-plugin and relocated the guava version you need into your own namespace.

On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <[hidden email]> wrote:
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <[hidden email]> wrote:
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.







Reply | Threaded
Open this post in threaded view
|

flink - Working with State example

Ramanan, Buvana (Nokia - US)

Hello,

 

I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code:

@Override

    public void open(Configuration config) {

        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =

                new ValueStateDescriptor<>(

                        "average", // the state name

                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information

                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set

        sum = getRuntimeContext().getState(descriptor);

    }

 

When I run, I get the following error:

Caused by: java.lang.RuntimeException: Error while getting state

               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)

               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)

               at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)

               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)

               at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)

               at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)

               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)

               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

               at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.

               at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)

               at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)

               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)

               ... 8 more

 

Where do I define the key & value serializer for state?

 

Thanks,

Buvana

Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

Janardhan Reddy
In reply to this post by rmetzger0
can you please explain a bit more about last option. We are using yarn so guava might be in some classpath.

On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <[hidden email]> wrote:
Can you check if the jar you are submitting to the cluster contains a different Guava than you use at compile time?

Also, it might happen that Guava is in your classpath, for example one some YARN setups.

The last resort to resolve these issues is to use the maven-shade-plugin and relocated the guava version you need into your own namespace.

On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <[hidden email]> wrote:
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <[hidden email]> wrote:
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.








Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

Janardhan Reddy
I have cross checked that all our yarn nodes have 1.8 java installed but still we are getting the error : Unsupported major.minor version 52.0

On Thu, Aug 11, 2016 at 1:35 AM, Janardhan Reddy <[hidden email]> wrote:
can you please explain a bit more about last option. We are using yarn so guava might be in some classpath.

On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <[hidden email]> wrote:
Can you check if the jar you are submitting to the cluster contains a different Guava than you use at compile time?

Also, it might happen that Guava is in your classpath, for example one some YARN setups.

The last resort to resolve these issues is to use the maven-shade-plugin and relocated the guava version you need into your own namespace.

On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <[hidden email]> wrote:
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <[hidden email]> wrote:
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.









Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

Janardhan Reddy
We don't use guava directly, we use another library which uses guava internally? How do we use shade plugin in this case.

On Thu, Aug 11, 2016 at 1:37 AM, Janardhan Reddy <[hidden email]> wrote:
I have cross checked that all our yarn nodes have 1.8 java installed but still we are getting the error : Unsupported major.minor version 52.0

On Thu, Aug 11, 2016 at 1:35 AM, Janardhan Reddy <[hidden email]> wrote:
can you please explain a bit more about last option. We are using yarn so guava might be in some classpath.

On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <[hidden email]> wrote:
Can you check if the jar you are submitting to the cluster contains a different Guava than you use at compile time?

Also, it might happen that Guava is in your classpath, for example one some YARN setups.

The last resort to resolve these issues is to use the maven-shade-plugin and relocated the guava version you need into your own namespace.

On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <[hidden email]> wrote:
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <[hidden email]> wrote:
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.










Reply | Threaded
Open this post in threaded view
|

Re: flink - Working with State example

Kostas Kloudas
In reply to this post by Ramanan, Buvana (Nokia - US)
Hello Buvana,

Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?

Thanks a lot,
Kostas

On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:

Hello,
 
I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code:

@Override

    public void open(Configuration config) {

        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =

                new ValueStateDescriptor<>(

                        "average", // the state name

                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information

                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set

        sum = getRuntimeContext().getState(descriptor);

    }

 
When I run, I get the following error:
Caused by: java.lang.RuntimeException: Error while getting state
               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
               at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
               at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
               at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.
               at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
               at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
               ... 8 more
 
Where do I define the key & value serializer for state?
 
Thanks,
Buvana

Reply | Threaded
Open this post in threaded view
|

RE: flink - Working with State example

Ramanan, Buvana (Nokia - US)

Hi Kostas,

 

Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is the current value of the incoming sample and x[t-1] is the previous value of the incoming sample. I store the current value in state store (‘prev_tuple’) so that I can use it for computation in next cycle. As you may observe, I am not using keyBy. I am simply printing out the resultant tuple.

 

It appears from the error message that I have to set the key serializer (and possibly value serializer) for the state store. I am not sure how to do that…

 

Thanks for your interest in helping,

 

 

Regards,

Buvana

 

public class stateful {

    private static String INPUT_KAFKA_TOPIC = null;

    private static int TIME_WINDOW = 0;

 

    public static void main(String[] args) throws Exception {

 

        if (args.length < 2) {

            throw new IllegalArgumentException("The application needs two arguments. The first is the name of the kafka topic from which it has to \n"

                    + "fetch the data. The second argument is the size of the window, in seconds, to which the aggregation function must be applied. \n");

        }

 

        INPUT_KAFKA_TOPIC = args[0];

        TIME_WINDOW = Integer.parseInt(args[1]);

 

        Properties properties = null;

 

        properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");

        properties.setProperty("zookeeper.connect", "localhost:2181");

        properties.setProperty("group.id", "test");

 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //env.setStateBackend(new FsStateBackend("file://home/buvana/flink/checkpoints"));

 

        DataStreamSource<String> stream = env

                .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), properties));

 

        // maps the data into Flink tuples

        DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new Rec2Tuple2());

 

        // write the result to the console or in a Kafka topic

        streamTuples.print();

 

        env.execute("plus one");

 

    }

 

    public static class Rec2Tuple2 extends RichFlatMapFunction<String, Tuple2<String,Double> > {

        private transient ValueState<Tuple2<String, Double>> prev_tuple;

 

        @Override

        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) throws Exception {

            try {

                Double value = Double.parseDouble(incString);

                System.out.println("value = " + value);

                Tuple2<String, Double> prev_stored_tp = prev_tuple.value();

                System.out.println(prev_stored_tp);

 

                Double value2 = value - prev_stored_tp.f1;

                prev_stored_tp.f1 = value;

                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;

                prev_tuple.update(prev_stored_tp);

 

                Tuple2<String, Double> tp = new Tuple2<String, Double>();

                tp.setField(INPUT_KAFKA_TOPIC, 0);

                tp.setField(value2, 1);

                out.collect(tp);

 

            } catch (NumberFormatException e) {

                System.out.println("Could not convert to Float" + incString);

                System.err.println("Could not convert to Float" + incString);

            }

        }

 

        @Override

        public void open(Configuration config) {

            ValueStateDescriptor<Tuple2<String, Double>> descriptor =

                    new ValueStateDescriptor<>(

                            "previous input value", // the state name

                            TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}), // type information

                            Tuple2.of("test topic", 0.0)); // default value of the state, if nothing was set

            prev_tuple = getRuntimeContext().getState(descriptor);

        }

    }

}

 

From: Kostas Kloudas [mailto:[hidden email]]
Sent: Thursday, August 11, 2016 5:45 AM
To: [hidden email]
Subject: Re: flink - Working with State example

 

Hello Buvana,

 

Can you share a bit more details on your operator and how you are using it?

For example, are you using keyBy before using you custom operator?

 

Thanks a lot,

Kostas

 

On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:

 

Hello,

 

I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code:

@Override

    public void open(Configuration config) {

        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =

                new ValueStateDescriptor<>(

                        "average", // the state name

                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information

                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set

        sum = getRuntimeContext().getState(descriptor);

    }

 

When I run, I get the following error:

Caused by: java.lang.RuntimeException: Error while getting state

               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)

               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)

               at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)

               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)

               at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)

               at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)

               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)

               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

               at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.

               at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)

               at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)

               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)

               ... 8 more

 

Where do I define the key & value serializer for state?

 

Thanks,

Buvana

 

Reply | Threaded
Open this post in threaded view
|

Re: flink - Working with State example

Ufuk Celebi
This only works for keyed streams, you have to use keyBy().

You can use the Checkpointed interface instead
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).

On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
<[hidden email]> wrote:

> Hi Kostas,
>
>
>
> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
> is the current value of the incoming sample and x[t-1] is the previous value
> of the incoming sample. I store the current value in state store
> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
> may observe, I am not using keyBy. I am simply printing out the resultant
> tuple.
>
>
>
> It appears from the error message that I have to set the key serializer (and
> possibly value serializer) for the state store. I am not sure how to do
> that…
>
>
>
> Thanks for your interest in helping,
>
>
>
>
>
> Regards,
>
> Buvana
>
>
>
> public class stateful {
>
>     private static String INPUT_KAFKA_TOPIC = null;
>
>     private static int TIME_WINDOW = 0;
>
>
>
>     public static void main(String[] args) throws Exception {
>
>
>
>         if (args.length < 2) {
>
>             throw new IllegalArgumentException("The application needs two
> arguments. The first is the name of the kafka topic from which it has to \n"
>
>                     + "fetch the data. The second argument is the size of
> the window, in seconds, to which the aggregation function must be applied.
> \n");
>
>         }
>
>
>
>         INPUT_KAFKA_TOPIC = args[0];
>
>         TIME_WINDOW = Integer.parseInt(args[1]);
>
>
>
>         Properties properties = null;
>
>
>
>         properties = new Properties();
>
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>
>         properties.setProperty("group.id", "test");
>
>
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         //env.setStateBackend(new
> FsStateBackend("file://home/buvana/flink/checkpoints"));
>
>
>
>         DataStreamSource<String> stream = env
>
>                 .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
> SimpleStringSchema(), properties));
>
>
>
>         // maps the data into Flink tuples
>
>         DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
> Rec2Tuple2());
>
>
>
>         // write the result to the console or in a Kafka topic
>
>         streamTuples.print();
>
>
>
>         env.execute("plus one");
>
>
>
>     }
>
>
>
>     public static class Rec2Tuple2 extends RichFlatMapFunction<String,
> Tuple2<String,Double> > {
>
>         private transient ValueState<Tuple2<String, Double>> prev_tuple;
>
>
>
>         @Override
>
>         public void flatMap(String incString, Collector<Tuple2<String,
> Double>> out) throws Exception {
>
>             try {
>
>                 Double value = Double.parseDouble(incString);
>
>                 System.out.println("value = " + value);
>
>                 Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>
>                 System.out.println(prev_stored_tp);
>
>
>
>                 Double value2 = value - prev_stored_tp.f1;
>
>                 prev_stored_tp.f1 = value;
>
>                 prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>
>                 prev_tuple.update(prev_stored_tp);
>
>
>
>                 Tuple2<String, Double> tp = new Tuple2<String, Double>();
>
>                 tp.setField(INPUT_KAFKA_TOPIC, 0);
>
>                 tp.setField(value2, 1);
>
>                 out.collect(tp);
>
>
>
>             } catch (NumberFormatException e) {
>
>                 System.out.println("Could not convert to Float" +
> incString);
>
>                 System.err.println("Could not convert to Float" +
> incString);
>
>             }
>
>         }
>
>
>
>         @Override
>
>         public void open(Configuration config) {
>
>             ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>
>                     new ValueStateDescriptor<>(
>
>                             "previous input value", // the state name
>
>                             TypeInformation.of(new TypeHint<Tuple2<String,
> Double>>() {}), // type information
>
>                             Tuple2.of("test topic", 0.0)); // default value
> of the state, if nothing was set
>
>             prev_tuple = getRuntimeContext().getState(descriptor);
>
>         }
>
>     }
>
> }
>
>
>
> From: Kostas Kloudas [mailto:[hidden email]]
> Sent: Thursday, August 11, 2016 5:45 AM
> To: [hidden email]
> Subject: Re: flink - Working with State example
>
>
>
> Hello Buvana,
>
>
>
> Can you share a bit more details on your operator and how you are using it?
>
> For example, are you using keyBy before using you custom operator?
>
>
>
> Thanks a lot,
>
> Kostas
>
>
>
> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
> <[hidden email]> wrote:
>
>
>
> Hello,
>
>
>
> I am utilizing the code snippet in:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
> and particularly ‘open’ function in my code:
>
> @Override
>
>     public void open(Configuration config) {
>
>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>
>                 new ValueStateDescriptor<>(
>
>                         "average", // the state name
>
>                         TypeInformation.of(new TypeHint<Tuple2<Long,
> Long>>() {}), // type information
>
>                         Tuple2.of(0L, 0L)); // default value of the state,
> if nothing was set
>
>         sum = getRuntimeContext().getState(descriptor);
>
>     }
>
>
>
> When I run, I get the following error:
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
>                at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>
>                at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>
>                at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>
>                at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>
>                at
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>
>                at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>
>                at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>
>                at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>
>                at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.Exception: State key serializer has not been configured
> in the config. This operation cannot use partitioned state.
>
>                at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>
>                at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>
>                at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>
>                ... 8 more
>
>
>
> Where do I define the key & value serializer for state?
>
>
>
> Thanks,
>
> Buvana
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink - Working with State example

Kostas Kloudas
Exactly as Ufuk suggested, if you are not grouping your stream by key,
you should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the one that
implicitly sets the keySerializer and scopes your (keyed) state to a specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed interface
should be used instead.

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>
> This only works for keyed streams, you have to use keyBy().
>
> You can use the Checkpointed interface instead
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
> <[hidden email]> wrote:
>> Hi Kostas,
>>
>>
>>
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
>> is the current value of the incoming sample and x[t-1] is the previous value
>> of the incoming sample. I store the current value in state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
>> may observe, I am not using keyBy. I am simply printing out the resultant
>> tuple.
>>
>>
>>
>> It appears from the error message that I have to set the key serializer (and
>> possibly value serializer) for the state store. I am not sure how to do
>> that…
>>
>>
>>
>> Thanks for your interest in helping,
>>
>>
>>
>>
>>
>> Regards,
>>
>> Buvana
>>
>>
>>
>> public class stateful {
>>
>>    private static String INPUT_KAFKA_TOPIC = null;
>>
>>    private static int TIME_WINDOW = 0;
>>
>>
>>
>>    public static void main(String[] args) throws Exception {
>>
>>
>>
>>        if (args.length < 2) {
>>
>>            throw new IllegalArgumentException("The application needs two
>> arguments. The first is the name of the kafka topic from which it has to \n"
>>
>>                    + "fetch the data. The second argument is the size of
>> the window, in seconds, to which the aggregation function must be applied.
>> \n");
>>
>>        }
>>
>>
>>
>>        INPUT_KAFKA_TOPIC = args[0];
>>
>>        TIME_WINDOW = Integer.parseInt(args[1]);
>>
>>
>>
>>        Properties properties = null;
>>
>>
>>
>>        properties = new Properties();
>>
>>        properties.setProperty("bootstrap.servers", "localhost:9092");
>>
>>        properties.setProperty("zookeeper.connect", "localhost:2181");
>>
>>        properties.setProperty("group.id", "test");
>>
>>
>>
>>        StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>        //env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>
>>
>>
>>        DataStreamSource<String> stream = env
>>
>>                .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
>> SimpleStringSchema(), properties));
>>
>>
>>
>>        // maps the data into Flink tuples
>>
>>        DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
>> Rec2Tuple2());
>>
>>
>>
>>        // write the result to the console or in a Kafka topic
>>
>>        streamTuples.print();
>>
>>
>>
>>        env.execute("plus one");
>>
>>
>>
>>    }
>>
>>
>>
>>    public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>> Tuple2<String,Double> > {
>>
>>        private transient ValueState<Tuple2<String, Double>> prev_tuple;
>>
>>
>>
>>        @Override
>>
>>        public void flatMap(String incString, Collector<Tuple2<String,
>> Double>> out) throws Exception {
>>
>>            try {
>>
>>                Double value = Double.parseDouble(incString);
>>
>>                System.out.println("value = " + value);
>>
>>                Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>>
>>                System.out.println(prev_stored_tp);
>>
>>
>>
>>                Double value2 = value - prev_stored_tp.f1;
>>
>>                prev_stored_tp.f1 = value;
>>
>>                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>
>>                prev_tuple.update(prev_stored_tp);
>>
>>
>>
>>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>
>>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>>
>>                tp.setField(value2, 1);
>>
>>                out.collect(tp);
>>
>>
>>
>>            } catch (NumberFormatException e) {
>>
>>                System.out.println("Could not convert to Float" +
>> incString);
>>
>>                System.err.println("Could not convert to Float" +
>> incString);
>>
>>            }
>>
>>        }
>>
>>
>>
>>        @Override
>>
>>        public void open(Configuration config) {
>>
>>            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>
>>                    new ValueStateDescriptor<>(
>>
>>                            "previous input value", // the state name
>>
>>                            TypeInformation.of(new TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>>
>>                            Tuple2.of("test topic", 0.0)); // default value
>> of the state, if nothing was set
>>
>>            prev_tuple = getRuntimeContext().getState(descriptor);
>>
>>        }
>>
>>    }
>>
>> }
>>
>>
>>
>> From: Kostas Kloudas [mailto:[hidden email]]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: [hidden email]
>> Subject: Re: flink - Working with State example
>>
>>
>>
>> Hello Buvana,
>>
>>
>>
>> Can you share a bit more details on your operator and how you are using it?
>>
>> For example, are you using keyBy before using you custom operator?
>>
>>
>>
>> Thanks a lot,
>>
>> Kostas
>>
>>
>>
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>> <[hidden email]> wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
>> and particularly ‘open’ function in my code:
>>
>> @Override
>>
>>    public void open(Configuration config) {
>>
>>        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>
>>                new ValueStateDescriptor<>(
>>
>>                        "average", // the state name
>>
>>                        TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>>
>>                        Tuple2.of(0L, 0L)); // default value of the state,
>> if nothing was set
>>
>>        sum = getRuntimeContext().getState(descriptor);
>>
>>    }
>>
>>
>>
>> When I run, I get the following error:
>>
>> Caused by: java.lang.RuntimeException: Error while getting state
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>>
>>               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>
>>               at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>
>>               at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>>
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>>
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>>
>>               at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>
>>               at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.Exception: State key serializer has not been configured
>> in the config. This operation cannot use partitioned state.
>>
>>               at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>>
>>               at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>>
>>               ... 8 more
>>
>>
>>
>> Where do I define the key & value serializer for state?
>>
>>
>>
>> Thanks,
>>
>> Buvana
>>
>>

Reply | Threaded
Open this post in threaded view
|

RE: flink - Working with State example

Ramanan, Buvana (Nokia - US)
Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:

Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?

Thanks again,
Buvana

================================================================================================================
Code:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.functions.RichFlatMapFunction;

import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * Created by buvana on 8/9/16.
 */
public class stateful {
    private static String INPUT_KAFKA_TOPIC = null;
---
--- skipping the main as it’s the same as before except for class name change -------------
---
        public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
            implements Checkpointed<Double> {

        private Double prev_tuple = null;

        @Override
        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
            try {
                Double value = Double.parseDouble(incString);
                System.out.println("value = " + value);
                System.out.println(prev_tuple);

                Double value2 = value - prev_tuple;
                prev_tuple = value;

                Tuple2<String, Double> tp = new Tuple2<String, Double>();
                tp.setField(INPUT_KAFKA_TOPIC, 0);
                tp.setField(value2, 1);
                out.collect(tp);
            } catch (NumberFormatException e) {
                System.out.println("Could not convert to Float" + incString);
                System.err.println("Could not convert to Float" + incString);
            }
        }
        @Override
        public void open(Configuration config) {
            if (prev_tuple == null) {
                // only recreate if null
                // restoreState will be called before open()
                // so this will already set the sum to the restored value
                prev_tuple = new Double("0.0");
            }
        }

        @Override
        public Serializable snapshotState(
                long checkpointId,
                long checkpointTimestamp) throws Exception {
            return prev_tuple;
        }


        @Override
        public void restoreState(Double state) {
            prev_tuple = state;
        }
    }
}
===============================================================================================================
ERROR message while building:

$ mvn clean package
[INFO] Scanning for projects...
[INFO]                                                                        
[INFO] ------------------------------------------------------------------------
[INFO] Building Flink Quickstart Job 0.1
[INFO] ------------------------------------------------------------------------
[WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been relocated to commons-io:commons-io:jar:1.3.2
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
[INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
[INFO]
[INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ wiki-edits ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 7 source files to /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
  return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
[INFO] 3 errors
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.171s
[INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
[INFO] Final Memory: 26M/660M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
================================================================================================================

-----Original Message-----
From: Kostas Kloudas [mailto:[hidden email]]
Sent: Thursday, August 11, 2016 10:34 AM
To: [hidden email]
Subject: Re: flink - Working with State example

Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>
> This only works for keyed streams, you have to use keyBy().
>
> You can use the Checkpointed interface instead
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
> <[hidden email]> wrote:
>> Hi Kostas,
>>
>>
>>
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where
>> x[t] is the current value of the incoming sample and x[t-1] is the
>> previous value of the incoming sample. I store the current value in
>> state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As
>> you may observe, I am not using keyBy. I am simply printing out the
>> resultant tuple.
>>
>>
>>
>> It appears from the error message that I have to set the key
>> serializer (and possibly value serializer) for the state store. I am
>> not sure how to do that…
>>
>>
>>
>> Thanks for your interest in helping,
>>
>>
>>
>>
>>
>> Regards,
>>
>> Buvana
>>
>>
>>
>> public class stateful {
>>
>>    private static String INPUT_KAFKA_TOPIC = null;
>>
>>    private static int TIME_WINDOW = 0;
>>
>>
>>
>>    public static void main(String[] args) throws Exception {
>>
>>
>>
>>        if (args.length < 2) {
>>
>>            throw new IllegalArgumentException("The application needs
>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>
>>                    + "fetch the data. The second argument is the size
>> of the window, in seconds, to which the aggregation function must be applied.
>> \n");
>>
>>        }
>>
>>
>>
>>        INPUT_KAFKA_TOPIC = args[0];
>>
>>        TIME_WINDOW = Integer.parseInt(args[1]);
>>
>>
>>
>>        Properties properties = null;
>>
>>
>>
>>        properties = new Properties();
>>
>>        properties.setProperty("bootstrap.servers", "localhost:9092");
>>
>>        properties.setProperty("zookeeper.connect", "localhost:2181");
>>
>>        properties.setProperty("group.id", "test");
>>
>>
>>
>>        StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>        //env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>
>>
>>
>>        DataStreamSource<String> stream = env
>>
>>                .addSource(new
>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>> properties));
>>
>>
>>
>>        // maps the data into Flink tuples
>>
>>        DataStream<Tuple2<String,Double>> streamTuples =
>> stream.flatMap(new Rec2Tuple2());
>>
>>
>>
>>        // write the result to the console or in a Kafka topic
>>
>>        streamTuples.print();
>>
>>
>>
>>        env.execute("plus one");
>>
>>
>>
>>    }
>>
>>
>>
>>    public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>> Tuple2<String,Double> > {
>>
>>        private transient ValueState<Tuple2<String, Double>>
>> prev_tuple;
>>
>>
>>
>>        @Override
>>
>>        public void flatMap(String incString, Collector<Tuple2<String,
>> Double>> out) throws Exception {
>>
>>            try {
>>
>>                Double value = Double.parseDouble(incString);
>>
>>                System.out.println("value = " + value);
>>
>>                Tuple2<String, Double> prev_stored_tp =
>> prev_tuple.value();
>>
>>                System.out.println(prev_stored_tp);
>>
>>
>>
>>                Double value2 = value - prev_stored_tp.f1;
>>
>>                prev_stored_tp.f1 = value;
>>
>>                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>
>>                prev_tuple.update(prev_stored_tp);
>>
>>
>>
>>                Tuple2<String, Double> tp = new Tuple2<String,
>> Double>();
>>
>>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>>
>>                tp.setField(value2, 1);
>>
>>                out.collect(tp);
>>
>>
>>
>>            } catch (NumberFormatException e) {
>>
>>                System.out.println("Could not convert to Float" +
>> incString);
>>
>>                System.err.println("Could not convert to Float" +
>> incString);
>>
>>            }
>>
>>        }
>>
>>
>>
>>        @Override
>>
>>        public void open(Configuration config) {
>>
>>            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>
>>                    new ValueStateDescriptor<>(
>>
>>                            "previous input value", // the state name
>>
>>                            TypeInformation.of(new
>> TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>>
>>                            Tuple2.of("test topic", 0.0)); // default
>> value of the state, if nothing was set
>>
>>            prev_tuple = getRuntimeContext().getState(descriptor);
>>
>>        }
>>
>>    }
>>
>> }
>>
>>
>>
>> From: Kostas Kloudas [mailto:[hidden email]]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: [hidden email]
>> Subject: Re: flink - Working with State example
>>
>>
>>
>> Hello Buvana,
>>
>>
>>
>> Can you share a bit more details on your operator and how you are using it?
>>
>> For example, are you using keyBy before using you custom operator?
>>
>>
>>
>> Thanks a lot,
>>
>> Kostas
>>
>>
>>
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>> <[hidden email]> wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming
>> /state.html and particularly ‘open’ function in my code:
>>
>> @Override
>>
>>    public void open(Configuration config) {
>>
>>        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>
>>                new ValueStateDescriptor<>(
>>
>>                        "average", // the state name
>>
>>                        TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>>
>>                        Tuple2.of(0L, 0L)); // default value of the
>> state, if nothing was set
>>
>>        sum = getRuntimeContext().getState(descriptor);
>>
>>    }
>>
>>
>>
>> When I run, I get the following error:
>>
>> Caused by: java.lang.RuntimeException: Error while getting state
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>> tate(StreamingRuntimeContext.java:120)
>>
>>               at
>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>
>>               at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction
>> (FunctionUtils.java:38)
>>
>>               at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
>> en(AbstractUdfStreamOperator.java:91)
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla
>> tMap.java:41)
>>
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
>> StreamTask.java:314)
>>
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> .java:214)
>>
>>               at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>
>>               at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.Exception: State key serializer has not been
>> configured in the config. This operation cannot use partitioned state.
>>
>>               at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta
>> te(AbstractStateBackend.java:199)
>>
>>               at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa
>> rtitionedState(AbstractStreamOperator.java:260)
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>> tate(StreamingRuntimeContext.java:118)
>>
>>               ... 8 more
>>
>>
>>
>> Where do I define the key & value serializer for state?
>>
>>
>>
>> Thanks,
>>
>> Buvana
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: flink - Working with State example

Kostas Kloudas
Hi Buvana,

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>
> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>
> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>
> Thanks again,
> Buvana
>
> ================================================================================================================
> Code:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>    private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name change -------------
> ---
> public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>            implements Checkpointed<Double> {
>
>        private Double prev_tuple = null;
>
>        @Override
>        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>            try {
>                Double value = Double.parseDouble(incString);
>                System.out.println("value = " + value);
>                System.out.println(prev_tuple);
>
>                Double value2 = value - prev_tuple;
>                prev_tuple = value;
>
>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>                tp.setField(value2, 1);
>                out.collect(tp);
>            } catch (NumberFormatException e) {
>                System.out.println("Could not convert to Float" + incString);
>                System.err.println("Could not convert to Float" + incString);
>            }
>        }
>        @Override
>        public void open(Configuration config) {
>            if (prev_tuple == null) {
>                // only recreate if null
>                // restoreState will be called before open()
>                // so this will already set the sum to the restored value
>                prev_tuple = new Double("0.0");
>            }
>        }
>
>        @Override
>        public Serializable snapshotState(
>                long checkpointId,
>                long checkpointTimestamp) throws Exception {
>            return prev_tuple;
>        }
>
>
>        @Override
>        public void restoreState(Double state) {
>            prev_tuple = state;
>        }
>    }
> }
> ===============================================================================================================
> ERROR message while building:
>
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]                                                                        
> [INFO] ------------------------------------------------------------------------
> [INFO] Building Flink Quickstart Job 0.1
> [INFO] ------------------------------------------------------------------------
> [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been relocated to commons-io:commons-io:jar:1.3.2
> [INFO]
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
> [INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO]
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ wiki-edits ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR :
> [INFO] -------------------------------------------------------------
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
>  return type java.io.Serializable is not compatible with java.lang.Double
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
> [INFO] 3 errors
> [INFO] -------------------------------------------------------------
> [INFO] ------------------------------------------------------------------------
> [INFO] BUILD FAILURE
> [INFO] ------------------------------------------------------------------------
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
> [INFO] Final Memory: 26M/660M
> [INFO] ------------------------------------------------------------------------
> [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with java.lang.Double
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
> [ERROR] -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please read the following articles:
> [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> ================================================================================================================
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:[hidden email]]
> Sent: Thursday, August 11, 2016 10:34 AM
> To: [hidden email]
> Subject: Re: flink - Working with State example
>
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>
> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>
> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.
>
> Let us know if you need anything else.
>
> Kostas
>
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>> This only works for keyed streams, you have to use keyBy().
>>
>> You can use the Checkpointed interface instead
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>> <[hidden email]> wrote:
>>> Hi Kostas,
>>>
>>>
>>>
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where
>>> x[t] is the current value of the incoming sample and x[t-1] is the
>>> previous value of the incoming sample. I store the current value in
>>> state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle. As
>>> you may observe, I am not using keyBy. I am simply printing out the
>>> resultant tuple.
>>>
>>>
>>>
>>> It appears from the error message that I have to set the key
>>> serializer (and possibly value serializer) for the state store. I am
>>> not sure how to do that…
>>>
>>>
>>>
>>> Thanks for your interest in helping,
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>> Buvana
>>>
>>>
>>>
>>> public class stateful {
>>>
>>>   private static String INPUT_KAFKA_TOPIC = null;
>>>
>>>   private static int TIME_WINDOW = 0;
>>>
>>>
>>>
>>>   public static void main(String[] args) throws Exception {
>>>
>>>
>>>
>>>       if (args.length < 2) {
>>>
>>>           throw new IllegalArgumentException("The application needs
>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>
>>>                   + "fetch the data. The second argument is the size
>>> of the window, in seconds, to which the aggregation function must be applied.
>>> \n");
>>>
>>>       }
>>>
>>>
>>>
>>>       INPUT_KAFKA_TOPIC = args[0];
>>>
>>>       TIME_WINDOW = Integer.parseInt(args[1]);
>>>
>>>
>>>
>>>       Properties properties = null;
>>>
>>>
>>>
>>>       properties = new Properties();
>>>
>>>       properties.setProperty("bootstrap.servers", "localhost:9092");
>>>
>>>       properties.setProperty("zookeeper.connect", "localhost:2181");
>>>
>>>       properties.setProperty("group.id", "test");
>>>
>>>
>>>
>>>       StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>       //env.setStateBackend(new
>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>
>>>
>>>
>>>       DataStreamSource<String> stream = env
>>>
>>>               .addSource(new
>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>>> properties));
>>>
>>>
>>>
>>>       // maps the data into Flink tuples
>>>
>>>       DataStream<Tuple2<String,Double>> streamTuples =
>>> stream.flatMap(new Rec2Tuple2());
>>>
>>>
>>>
>>>       // write the result to the console or in a Kafka topic
>>>
>>>       streamTuples.print();
>>>
>>>
>>>
>>>       env.execute("plus one");
>>>
>>>
>>>
>>>   }
>>>
>>>
>>>
>>>   public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>>> Tuple2<String,Double> > {
>>>
>>>       private transient ValueState<Tuple2<String, Double>>
>>> prev_tuple;
>>>
>>>
>>>
>>>       @Override
>>>
>>>       public void flatMap(String incString, Collector<Tuple2<String,
>>> Double>> out) throws Exception {
>>>
>>>           try {
>>>
>>>               Double value = Double.parseDouble(incString);
>>>
>>>               System.out.println("value = " + value);
>>>
>>>               Tuple2<String, Double> prev_stored_tp =
>>> prev_tuple.value();
>>>
>>>               System.out.println(prev_stored_tp);
>>>
>>>
>>>
>>>               Double value2 = value - prev_stored_tp.f1;
>>>
>>>               prev_stored_tp.f1 = value;
>>>
>>>               prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>
>>>               prev_tuple.update(prev_stored_tp);
>>>
>>>
>>>
>>>               Tuple2<String, Double> tp = new Tuple2<String,
>>> Double>();
>>>
>>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>
>>>               tp.setField(value2, 1);
>>>
>>>               out.collect(tp);
>>>
>>>
>>>
>>>           } catch (NumberFormatException e) {
>>>
>>>               System.out.println("Could not convert to Float" +
>>> incString);
>>>
>>>               System.err.println("Could not convert to Float" +
>>> incString);
>>>
>>>           }
>>>
>>>       }
>>>
>>>
>>>
>>>       @Override
>>>
>>>       public void open(Configuration config) {
>>>
>>>           ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>
>>>                   new ValueStateDescriptor<>(
>>>
>>>                           "previous input value", // the state name
>>>
>>>                           TypeInformation.of(new
>>> TypeHint<Tuple2<String,
>>> Double>>() {}), // type information
>>>
>>>                           Tuple2.of("test topic", 0.0)); // default
>>> value of the state, if nothing was set
>>>
>>>           prev_tuple = getRuntimeContext().getState(descriptor);
>>>
>>>       }
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> From: Kostas Kloudas [mailto:[hidden email]]
>>> Sent: Thursday, August 11, 2016 5:45 AM
>>> To: [hidden email]
>>> Subject: Re: flink - Working with State example
>>>
>>>
>>>
>>> Hello Buvana,
>>>
>>>
>>>
>>> Can you share a bit more details on your operator and how you are using it?
>>>
>>> For example, are you using keyBy before using you custom operator?
>>>
>>>
>>>
>>> Thanks a lot,
>>>
>>> Kostas
>>>
>>>
>>>
>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>>> <[hidden email]> wrote:
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I am utilizing the code snippet in:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming
>>> /state.html and particularly ‘open’ function in my code:
>>>
>>> @Override
>>>
>>>   public void open(Configuration config) {
>>>
>>>       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>
>>>               new ValueStateDescriptor<>(
>>>
>>>                       "average", // the state name
>>>
>>>                       TypeInformation.of(new TypeHint<Tuple2<Long,
>>> Long>>() {}), // type information
>>>
>>>                       Tuple2.of(0L, 0L)); // default value of the
>>> state, if nothing was set
>>>
>>>       sum = getRuntimeContext().getState(descriptor);
>>>
>>>   }
>>>
>>>
>>>
>>> When I run, I get the following error:
>>>
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>>> tate(StreamingRuntimeContext.java:120)
>>>
>>>              at
>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>
>>>              at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction
>>> (FunctionUtils.java:38)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
>>> en(AbstractUdfStreamOperator.java:91)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla
>>> tMap.java:41)
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
>>> StreamTask.java:314)
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>>> .java:214)
>>>
>>>              at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>
>>>              at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.Exception: State key serializer has not been
>>> configured in the config. This operation cannot use partitioned state.
>>>
>>>              at
>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta
>>> te(AbstractStateBackend.java:199)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa
>>> rtitionedState(AbstractStreamOperator.java:260)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>>> tate(StreamingRuntimeContext.java:118)
>>>
>>>              ... 8 more
>>>
>>>
>>>
>>> Where do I define the key & value serializer for state?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Buvana
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

RE: flink - Working with State example

Ramanan, Buvana (Nokia - US)
Kostas,
Good catch! That makes it working! Thank you so much for the help.
Regards,
Buvana

-----Original Message-----
From: Kostas Kloudas [mailto:[hidden email]]
Sent: Thursday, August 11, 2016 11:22 AM
To: [hidden email]
Subject: Re: flink - Working with State example

Hi Buvana,

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>
> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>
> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>
> Thanks again,
> Buvana
>
> ======================================================================
> ==========================================
> Code:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import
> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>    private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name
> change -------------
> ---
> public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>            implements Checkpointed<Double> {
>
>        private Double prev_tuple = null;
>
>        @Override
>        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>            try {
>                Double value = Double.parseDouble(incString);
>                System.out.println("value = " + value);
>                System.out.println(prev_tuple);
>
>                Double value2 = value - prev_tuple;
>                prev_tuple = value;
>
>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>                tp.setField(value2, 1);
>                out.collect(tp);
>            } catch (NumberFormatException e) {
>                System.out.println("Could not convert to Float" + incString);
>                System.err.println("Could not convert to Float" + incString);
>            }
>        }
>        @Override
>        public void open(Configuration config) {
>            if (prev_tuple == null) {
>                // only recreate if null
>                // restoreState will be called before open()
>                // so this will already set the sum to the restored value
>                prev_tuple = new Double("0.0");
>            }
>        }
>
>        @Override
>        public Serializable snapshotState(
>                long checkpointId,
>                long checkpointTimestamp) throws Exception {
>            return prev_tuple;
>        }
>
>
>        @Override
>        public void restoreState(Double state) {
>            prev_tuple = state;
>        }
>    }
> }
> ======================================================================
> =========================================
> ERROR message while building:
>
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]                                                                        
> [INFO]
> ----------------------------------------------------------------------
> -- [INFO] Building Flink Quickstart Job 0.1 [INFO]
> ----------------------------------------------------------------------
> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has
> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] ---
> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO]
> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO]
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
> wiki-edits --- [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to
> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR :
> [INFO] -------------------------------------------------------------
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
> does not override abstract method snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[151,29] snapshotState(long,long) in
> wikiedits.stateful.MapStateful cannot implement
> snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
>  return type java.io.Serializable is not compatible with
> java.lang.Double [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[150,9] method does not override or implement a method from
> a supertype [INFO] 3 errors [INFO]
> -------------------------------------------------------------
> [INFO]
> ----------------------------------------------------------------------
> --
> [INFO] BUILD FAILURE
> [INFO]
> ----------------------------------------------------------------------
> --
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory:
> 26M/660M [INFO]
> ----------------------------------------------------------------------
> -- [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
> does not override abstract method snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[151,29] snapshotState(long,long) in
> wikiedits.stateful.MapStateful cannot implement
> snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with
> java.lang.Double [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[150,9] method does not override or implement a method from
> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> ======================================================================
> ==========================================
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:[hidden email]]
> Sent: Thursday, August 11, 2016 10:34 AM
> To: [hidden email]
> Subject: Re: flink - Working with State example
>
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>
> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>
> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.
>
> Let us know if you need anything else.
>
> Kostas
>
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>> This only works for keyed streams, you have to use keyBy().
>>
>> You can use the Checkpointed interface instead
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>> <[hidden email]> wrote:
>>> Hi Kostas,
>>>
>>>
>>>
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]),
>>> where x[t] is the current value of the incoming sample and x[t-1] is
>>> the previous value of the incoming sample. I store the current value
>>> in state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle.
>>> As you may observe, I am not using keyBy. I am simply printing out
>>> the resultant tuple.
>>>
>>>
>>>
>>> It appears from the error message that I have to set the key
>>> serializer (and possibly value serializer) for the state store. I am
>>> not sure how to do that…
>>>
>>>
>>>
>>> Thanks for your interest in helping,
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>> Buvana
>>>
>>>
>>>
>>> public class stateful {
>>>
>>>   private static String INPUT_KAFKA_TOPIC = null;
>>>
>>>   private static int TIME_WINDOW = 0;
>>>
>>>
>>>
>>>   public static void main(String[] args) throws Exception {
>>>
>>>
>>>
>>>       if (args.length < 2) {
>>>
>>>           throw new IllegalArgumentException("The application needs
>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>
>>>                   + "fetch the data. The second argument is the size
>>> of the window, in seconds, to which the aggregation function must be applied.
>>> \n");
>>>
>>>       }
>>>
>>>
>>>
>>>       INPUT_KAFKA_TOPIC = args[0];
>>>
>>>       TIME_WINDOW = Integer.parseInt(args[1]);
>>>
>>>
>>>
>>>       Properties properties = null;
>>>
>>>
>>>
>>>       properties = new Properties();
>>>
>>>       properties.setProperty("bootstrap.servers", "localhost:9092");
>>>
>>>       properties.setProperty("zookeeper.connect", "localhost:2181");
>>>
>>>       properties.setProperty("group.id", "test");
>>>
>>>
>>>
>>>       StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>       //env.setStateBackend(new
>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>
>>>
>>>
>>>       DataStreamSource<String> stream = env
>>>
>>>               .addSource(new
>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>>> properties));
>>>
>>>
>>>
>>>       // maps the data into Flink tuples
>>>
>>>       DataStream<Tuple2<String,Double>> streamTuples =
>>> stream.flatMap(new Rec2Tuple2());
>>>
>>>
>>>
>>>       // write the result to the console or in a Kafka topic
>>>
>>>       streamTuples.print();
>>>
>>>
>>>
>>>       env.execute("plus one");
>>>
>>>
>>>
>>>   }
>>>
>>>
>>>
>>>   public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>>> Tuple2<String,Double> > {
>>>
>>>       private transient ValueState<Tuple2<String, Double>>
>>> prev_tuple;
>>>
>>>
>>>
>>>       @Override
>>>
>>>       public void flatMap(String incString, Collector<Tuple2<String,
>>> Double>> out) throws Exception {
>>>
>>>           try {
>>>
>>>               Double value = Double.parseDouble(incString);
>>>
>>>               System.out.println("value = " + value);
>>>
>>>               Tuple2<String, Double> prev_stored_tp =
>>> prev_tuple.value();
>>>
>>>               System.out.println(prev_stored_tp);
>>>
>>>
>>>
>>>               Double value2 = value - prev_stored_tp.f1;
>>>
>>>               prev_stored_tp.f1 = value;
>>>
>>>               prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>
>>>               prev_tuple.update(prev_stored_tp);
>>>
>>>
>>>
>>>               Tuple2<String, Double> tp = new Tuple2<String,
>>> Double>();
>>>
>>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>
>>>               tp.setField(value2, 1);
>>>
>>>               out.collect(tp);
>>>
>>>
>>>
>>>           } catch (NumberFormatException e) {
>>>
>>>               System.out.println("Could not convert to Float" +
>>> incString);
>>>
>>>               System.err.println("Could not convert to Float" +
>>> incString);
>>>
>>>           }
>>>
>>>       }
>>>
>>>
>>>
>>>       @Override
>>>
>>>       public void open(Configuration config) {
>>>
>>>           ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>
>>>                   new ValueStateDescriptor<>(
>>>
>>>                           "previous input value", // the state name
>>>
>>>                           TypeInformation.of(new
>>> TypeHint<Tuple2<String,
>>> Double>>() {}), // type information
>>>
>>>                           Tuple2.of("test topic", 0.0)); // default
>>> value of the state, if nothing was set
>>>
>>>           prev_tuple = getRuntimeContext().getState(descriptor);
>>>
>>>       }
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> From: Kostas Kloudas [mailto:[hidden email]]
>>> Sent: Thursday, August 11, 2016 5:45 AM
>>> To: [hidden email]
>>> Subject: Re: flink - Working with State example
>>>
>>>
>>>
>>> Hello Buvana,
>>>
>>>
>>>
>>> Can you share a bit more details on your operator and how you are using it?
>>>
>>> For example, are you using keyBy before using you custom operator?
>>>
>>>
>>>
>>> Thanks a lot,
>>>
>>> Kostas
>>>
>>>
>>>
>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>>> <[hidden email]> wrote:
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I am utilizing the code snippet in:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>> g /state.html and particularly ‘open’ function in my code:
>>>
>>> @Override
>>>
>>>   public void open(Configuration config) {
>>>
>>>       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>
>>>               new ValueStateDescriptor<>(
>>>
>>>                       "average", // the state name
>>>
>>>                       TypeInformation.of(new TypeHint<Tuple2<Long,
>>> Long>>() {}), // type information
>>>
>>>                       Tuple2.of(0L, 0L)); // default value of the
>>> state, if nothing was set
>>>
>>>       sum = getRuntimeContext().getState(descriptor);
>>>
>>>   }
>>>
>>>
>>>
>>> When I run, I get the following error:
>>>
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>> S
>>> tate(StreamingRuntimeContext.java:120)
>>>
>>>              at
>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>
>>>              at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>> n
>>> (FunctionUtils.java:38)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>> p
>>> en(AbstractUdfStreamOperator.java:91)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>> a
>>> tMap.java:41)
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>> (
>>> StreamTask.java:314)
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>> k
>>> .java:214)
>>>
>>>              at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>
>>>              at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.Exception: State key serializer has not been
>>> configured in the config. This operation cannot use partitioned state.
>>>
>>>              at
>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>> a
>>> te(AbstractStateBackend.java:199)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>> a
>>> rtitionedState(AbstractStreamOperator.java:260)
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>> S
>>> tate(StreamingRuntimeContext.java:118)
>>>
>>>              ... 8 more
>>>
>>>
>>>
>>> Where do I define the key & value serializer for state?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Buvana
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink - Working with State example

Kostas Kloudas
No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:[hidden email]]
> Sent: Thursday, August 11, 2016 11:22 AM
> To: [hidden email]
> Subject: Re: flink - Working with State example
>
> Hi Buvana,
>
> At a first glance, your snapshotState() should return a Double.
>
> Kostas
>
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>>
>> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>>
>> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>>
>> Thanks again,
>> Buvana
>>
>> ======================================================================
>> ==========================================
>> Code:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>>
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>>
>> import java.util.Properties;
>>
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name
>> change -------------
>> ---
>> public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>>           implements Checkpointed<Double> {
>>
>>       private Double prev_tuple = null;
>>
>>       @Override
>>       public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>>           try {
>>               Double value = Double.parseDouble(incString);
>>               System.out.println("value = " + value);
>>               System.out.println(prev_tuple);
>>
>>               Double value2 = value - prev_tuple;
>>               prev_tuple = value;
>>
>>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>               tp.setField(value2, 1);
>>               out.collect(tp);
>>           } catch (NumberFormatException e) {
>>               System.out.println("Could not convert to Float" + incString);
>>               System.err.println("Could not convert to Float" + incString);
>>           }
>>       }
>>       @Override
>>       public void open(Configuration config) {
>>           if (prev_tuple == null) {
>>               // only recreate if null
>>               // restoreState will be called before open()
>>               // so this will already set the sum to the restored value
>>               prev_tuple = new Double("0.0");
>>           }
>>       }
>>
>>       @Override
>>       public Serializable snapshotState(
>>               long checkpointId,
>>               long checkpointTimestamp) throws Exception {
>>           return prev_tuple;
>>       }
>>
>>
>>       @Override
>>       public void restoreState(Double state) {
>>           prev_tuple = state;
>>       }
>>   }
>> }
>> ======================================================================
>> =========================================
>> ERROR message while building:
>>
>> $ mvn clean package
>> [INFO] Scanning for projects...
>> [INFO]                                                                        
>> [INFO]
>> ----------------------------------------------------------------------
>> -- [INFO] Building Flink Quickstart Job 0.1 [INFO]
>> ----------------------------------------------------------------------
>> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has
>> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] ---
>> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO]
>> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
>> [INFO]
>> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
>> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] Copying 1 resource
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
>> wiki-edits --- [INFO] Changes detected - recompiling the module!
>> [INFO] Compiling 7 source files to
>> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
>> [INFO] -------------------------------------------------------------
>> [ERROR] COMPILATION ERROR :
>> [INFO] -------------------------------------------------------------
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
>> does not override abstract method snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in
>> wikiedits.stateful.MapStateful cannot implement
>> snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> return type java.io.Serializable is not compatible with
>> java.lang.Double [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from
>> a supertype [INFO] 3 errors [INFO]
>> -------------------------------------------------------------
>> [INFO]
>> ----------------------------------------------------------------------
>> --
>> [INFO] BUILD FAILURE
>> [INFO]
>> ----------------------------------------------------------------------
>> --
>> [INFO] Total time: 2.171s
>> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory:
>> 26M/660M [INFO]
>> ----------------------------------------------------------------------
>> -- [ERROR] Failed to execute goal
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
>> does not override abstract method snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in
>> wikiedits.stateful.MapStateful cannot implement
>> snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] return type java.io.Serializable is not compatible with
>> java.lang.Double [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from
>> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions, please read the following articles:
>> [ERROR] [Help 1]
>> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>> ======================================================================
>> ==========================================
>>
>> -----Original Message-----
>> From: Kostas Kloudas [mailto:[hidden email]]
>> Sent: Thursday, August 11, 2016 10:34 AM
>> To: [hidden email]
>> Subject: Re: flink - Working with State example
>>
>> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>>
>> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>>
>> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.
>>
>> Let us know if you need anything else.
>>
>> Kostas
>>
>>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> This only works for keyed streams, you have to use keyBy().
>>>
>>> You can use the Checkpointed interface instead
>>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>>
>>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>>> <[hidden email]> wrote:
>>>> Hi Kostas,
>>>>
>>>>
>>>>
>>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]),
>>>> where x[t] is the current value of the incoming sample and x[t-1] is
>>>> the previous value of the incoming sample. I store the current value
>>>> in state store
>>>> (‘prev_tuple’) so that I can use it for computation in next cycle.
>>>> As you may observe, I am not using keyBy. I am simply printing out
>>>> the resultant tuple.
>>>>
>>>>
>>>>
>>>> It appears from the error message that I have to set the key
>>>> serializer (and possibly value serializer) for the state store. I am
>>>> not sure how to do that…
>>>>
>>>>
>>>>
>>>> Thanks for your interest in helping,
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Buvana
>>>>
>>>>
>>>>
>>>> public class stateful {
>>>>
>>>>  private static String INPUT_KAFKA_TOPIC = null;
>>>>
>>>>  private static int TIME_WINDOW = 0;
>>>>
>>>>
>>>>
>>>>  public static void main(String[] args) throws Exception {
>>>>
>>>>
>>>>
>>>>      if (args.length < 2) {
>>>>
>>>>          throw new IllegalArgumentException("The application needs
>>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>>
>>>>                  + "fetch the data. The second argument is the size
>>>> of the window, in seconds, to which the aggregation function must be applied.
>>>> \n");
>>>>
>>>>      }
>>>>
>>>>
>>>>
>>>>      INPUT_KAFKA_TOPIC = args[0];
>>>>
>>>>      TIME_WINDOW = Integer.parseInt(args[1]);
>>>>
>>>>
>>>>
>>>>      Properties properties = null;
>>>>
>>>>
>>>>
>>>>      properties = new Properties();
>>>>
>>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>
>>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
>>>>
>>>>      properties.setProperty("group.id", "test");
>>>>
>>>>
>>>>
>>>>      StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>      //env.setStateBackend(new
>>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>>
>>>>
>>>>
>>>>      DataStreamSource<String> stream = env
>>>>
>>>>              .addSource(new
>>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>>>> properties));
>>>>
>>>>
>>>>
>>>>      // maps the data into Flink tuples
>>>>
>>>>      DataStream<Tuple2<String,Double>> streamTuples =
>>>> stream.flatMap(new Rec2Tuple2());
>>>>
>>>>
>>>>
>>>>      // write the result to the console or in a Kafka topic
>>>>
>>>>      streamTuples.print();
>>>>
>>>>
>>>>
>>>>      env.execute("plus one");
>>>>
>>>>
>>>>
>>>>  }
>>>>
>>>>
>>>>
>>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>>>> Tuple2<String,Double> > {
>>>>
>>>>      private transient ValueState<Tuple2<String, Double>>
>>>> prev_tuple;
>>>>
>>>>
>>>>
>>>>      @Override
>>>>
>>>>      public void flatMap(String incString, Collector<Tuple2<String,
>>>> Double>> out) throws Exception {
>>>>
>>>>          try {
>>>>
>>>>              Double value = Double.parseDouble(incString);
>>>>
>>>>              System.out.println("value = " + value);
>>>>
>>>>              Tuple2<String, Double> prev_stored_tp =
>>>> prev_tuple.value();
>>>>
>>>>              System.out.println(prev_stored_tp);
>>>>
>>>>
>>>>
>>>>              Double value2 = value - prev_stored_tp.f1;
>>>>
>>>>              prev_stored_tp.f1 = value;
>>>>
>>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>>
>>>>              prev_tuple.update(prev_stored_tp);
>>>>
>>>>
>>>>
>>>>              Tuple2<String, Double> tp = new Tuple2<String,
>>>> Double>();
>>>>
>>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>>
>>>>              tp.setField(value2, 1);
>>>>
>>>>              out.collect(tp);
>>>>
>>>>
>>>>
>>>>          } catch (NumberFormatException e) {
>>>>
>>>>              System.out.println("Could not convert to Float" +
>>>> incString);
>>>>
>>>>              System.err.println("Could not convert to Float" +
>>>> incString);
>>>>
>>>>          }
>>>>
>>>>      }
>>>>
>>>>
>>>>
>>>>      @Override
>>>>
>>>>      public void open(Configuration config) {
>>>>
>>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>>
>>>>                  new ValueStateDescriptor<>(
>>>>
>>>>                          "previous input value", // the state name
>>>>
>>>>                          TypeInformation.of(new
>>>> TypeHint<Tuple2<String,
>>>> Double>>() {}), // type information
>>>>
>>>>                          Tuple2.of("test topic", 0.0)); // default
>>>> value of the state, if nothing was set
>>>>
>>>>          prev_tuple = getRuntimeContext().getState(descriptor);
>>>>
>>>>      }
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> From: Kostas Kloudas [mailto:[hidden email]]
>>>> Sent: Thursday, August 11, 2016 5:45 AM
>>>> To: [hidden email]
>>>> Subject: Re: flink - Working with State example
>>>>
>>>>
>>>>
>>>> Hello Buvana,
>>>>
>>>>
>>>>
>>>> Can you share a bit more details on your operator and how you are using it?
>>>>
>>>> For example, are you using keyBy before using you custom operator?
>>>>
>>>>
>>>>
>>>> Thanks a lot,
>>>>
>>>> Kostas
>>>>
>>>>
>>>>
>>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>>>> <[hidden email]> wrote:
>>>>
>>>>
>>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I am utilizing the code snippet in:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>>> g /state.html and particularly ‘open’ function in my code:
>>>>
>>>> @Override
>>>>
>>>>  public void open(Configuration config) {
>>>>
>>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>>
>>>>              new ValueStateDescriptor<>(
>>>>
>>>>                      "average", // the state name
>>>>
>>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
>>>> Long>>() {}), // type information
>>>>
>>>>                      Tuple2.of(0L, 0L)); // default value of the
>>>> state, if nothing was set
>>>>
>>>>      sum = getRuntimeContext().getState(descriptor);
>>>>
>>>>  }
>>>>
>>>>
>>>>
>>>> When I run, I get the following error:
>>>>
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:120)
>>>>
>>>>             at
>>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>>
>>>>             at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>>> n
>>>> (FunctionUtils.java:38)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>>> p
>>>> en(AbstractUdfStreamOperator.java:91)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>>> a
>>>> tMap.java:41)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>>> (
>>>> StreamTask.java:314)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>>> k
>>>> .java:214)
>>>>
>>>>             at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>
>>>>             at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.lang.Exception: State key serializer has not been
>>>> configured in the config. This operation cannot use partitioned state.
>>>>
>>>>             at
>>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>>> a
>>>> te(AbstractStateBackend.java:199)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>>> a
>>>> rtitionedState(AbstractStreamOperator.java:260)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:118)
>>>>
>>>>             ... 8 more
>>>>
>>>>
>>>>
>>>> Where do I define the key & value serializer for state?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Buvana
>>>>
>>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

RE: flink - Working with State example

Ramanan, Buvana (Nokia - US)
Hi Kostas,

I am trying to use FsStateBackend as the backend for storing state. And configure it as follows in the code:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(new FsStateBackend("file:///home/buvana/flink/checkpoints"));
        env.enableCheckpointing(10000);

everything else is same as the code I shared with you previously.

When I execute, I see that a directory is created under /home/buvana/flink/checkpoints, but there is nothing under that directory.
I was expecting to find some file / sub dir there.

Please explain.

Thanks,
Buvana

-----Original Message-----
From: Kostas Kloudas [mailto:[hidden email]]
Sent: Friday, August 12, 2016 1:37 AM
To: [hidden email]
Subject: Re: flink - Working with State example

No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:[hidden email]]
> Sent: Thursday, August 11, 2016 11:22 AM
> To: [hidden email]
> Subject: Re: flink - Working with State example
>
> Hi Buvana,
>
> At a first glance, your snapshotState() should return a Double.
>
> Kostas
>
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>>
>> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>>
>> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>>
>> Thanks again,
>> Buvana
>>
>> ======================================================================
>> ==========================================
>> Code:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>>
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>>
>> import java.util.Properties;
>>
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name
>> change -------------
>> ---
>> public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>>           implements Checkpointed<Double> {
>>
>>       private Double prev_tuple = null;
>>
>>       @Override
>>       public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>>           try {
>>               Double value = Double.parseDouble(incString);
>>               System.out.println("value = " + value);
>>               System.out.println(prev_tuple);
>>
>>               Double value2 = value - prev_tuple;
>>               prev_tuple = value;
>>
>>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>               tp.setField(value2, 1);
>>               out.collect(tp);
>>           } catch (NumberFormatException e) {
>>               System.out.println("Could not convert to Float" + incString);
>>               System.err.println("Could not convert to Float" + incString);
>>           }
>>       }
>>       @Override
>>       public void open(Configuration config) {
>>           if (prev_tuple == null) {
>>               // only recreate if null
>>               // restoreState will be called before open()
>>               // so this will already set the sum to the restored value
>>               prev_tuple = new Double("0.0");
>>           }
>>       }
>>
>>       @Override
>>       public Serializable snapshotState(
>>               long checkpointId,
>>               long checkpointTimestamp) throws Exception {
>>           return prev_tuple;
>>       }
>>
>>
>>       @Override
>>       public void restoreState(Double state) {
>>           prev_tuple = state;
>>       }
>>   }
>> }
>> ======================================================================
>> =========================================
>> ERROR message while building:
>>
>> $ mvn clean package
>> [INFO] Scanning for projects...
>> [INFO]                                                                        
>> [INFO]
>> ----------------------------------------------------------------------
>> -- [INFO] Building Flink Quickstart Job 0.1 [INFO]
>> ----------------------------------------------------------------------
>> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has
>> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] ---
>> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO]
>> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
>> [INFO]
>> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
>> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] Copying 1 resource
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
>> wiki-edits --- [INFO] Changes detected - recompiling the module!
>> [INFO] Compiling 7 source files to
>> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
>> [INFO] -------------------------------------------------------------
>> [ERROR] COMPILATION ERROR :
>> [INFO] -------------------------------------------------------------
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
>> does not override abstract method snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in
>> wikiedits.stateful.MapStateful cannot implement
>> snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> return type java.io.Serializable is not compatible with
>> java.lang.Double [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from
>> a supertype [INFO] 3 errors [INFO]
>> -------------------------------------------------------------
>> [INFO]
>> ----------------------------------------------------------------------
>> --
>> [INFO] BUILD FAILURE
>> [INFO]
>> ----------------------------------------------------------------------
>> --
>> [INFO] Total time: 2.171s
>> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory:
>> 26M/660M [INFO]
>> ----------------------------------------------------------------------
>> -- [ERROR] Failed to execute goal
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
>> does not override abstract method snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in
>> wikiedits.stateful.MapStateful cannot implement
>> snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] return type java.io.Serializable is not compatible with
>> java.lang.Double [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from
>> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions, please read the following articles:
>> [ERROR] [Help 1]
>> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>> ======================================================================
>> ==========================================
>>
>> -----Original Message-----
>> From: Kostas Kloudas [mailto:[hidden email]]
>> Sent: Thursday, August 11, 2016 10:34 AM
>> To: [hidden email]
>> Subject: Re: flink - Working with State example
>>
>> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>>
>> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>>
>> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.
>>
>> Let us know if you need anything else.
>>
>> Kostas
>>
>>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> This only works for keyed streams, you have to use keyBy().
>>>
>>> You can use the Checkpointed interface instead
>>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>>
>>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>>> <[hidden email]> wrote:
>>>> Hi Kostas,
>>>>
>>>>
>>>>
>>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]),
>>>> where x[t] is the current value of the incoming sample and x[t-1] is
>>>> the previous value of the incoming sample. I store the current value
>>>> in state store
>>>> (‘prev_tuple’) so that I can use it for computation in next cycle.
>>>> As you may observe, I am not using keyBy. I am simply printing out
>>>> the resultant tuple.
>>>>
>>>>
>>>>
>>>> It appears from the error message that I have to set the key
>>>> serializer (and possibly value serializer) for the state store. I am
>>>> not sure how to do that…
>>>>
>>>>
>>>>
>>>> Thanks for your interest in helping,
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Buvana
>>>>
>>>>
>>>>
>>>> public class stateful {
>>>>
>>>>  private static String INPUT_KAFKA_TOPIC = null;
>>>>
>>>>  private static int TIME_WINDOW = 0;
>>>>
>>>>
>>>>
>>>>  public static void main(String[] args) throws Exception {
>>>>
>>>>
>>>>
>>>>      if (args.length < 2) {
>>>>
>>>>          throw new IllegalArgumentException("The application needs
>>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>>
>>>>                  + "fetch the data. The second argument is the size
>>>> of the window, in seconds, to which the aggregation function must be applied.
>>>> \n");
>>>>
>>>>      }
>>>>
>>>>
>>>>
>>>>      INPUT_KAFKA_TOPIC = args[0];
>>>>
>>>>      TIME_WINDOW = Integer.parseInt(args[1]);
>>>>
>>>>
>>>>
>>>>      Properties properties = null;
>>>>
>>>>
>>>>
>>>>      properties = new Properties();
>>>>
>>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>
>>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
>>>>
>>>>      properties.setProperty("group.id", "test");
>>>>
>>>>
>>>>
>>>>      StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>      //env.setStateBackend(new
>>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>>
>>>>
>>>>
>>>>      DataStreamSource<String> stream = env
>>>>
>>>>              .addSource(new
>>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>>>> properties));
>>>>
>>>>
>>>>
>>>>      // maps the data into Flink tuples
>>>>
>>>>      DataStream<Tuple2<String,Double>> streamTuples =
>>>> stream.flatMap(new Rec2Tuple2());
>>>>
>>>>
>>>>
>>>>      // write the result to the console or in a Kafka topic
>>>>
>>>>      streamTuples.print();
>>>>
>>>>
>>>>
>>>>      env.execute("plus one");
>>>>
>>>>
>>>>
>>>>  }
>>>>
>>>>
>>>>
>>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>>>> Tuple2<String,Double> > {
>>>>
>>>>      private transient ValueState<Tuple2<String, Double>>
>>>> prev_tuple;
>>>>
>>>>
>>>>
>>>>      @Override
>>>>
>>>>      public void flatMap(String incString, Collector<Tuple2<String,
>>>> Double>> out) throws Exception {
>>>>
>>>>          try {
>>>>
>>>>              Double value = Double.parseDouble(incString);
>>>>
>>>>              System.out.println("value = " + value);
>>>>
>>>>              Tuple2<String, Double> prev_stored_tp =
>>>> prev_tuple.value();
>>>>
>>>>              System.out.println(prev_stored_tp);
>>>>
>>>>
>>>>
>>>>              Double value2 = value - prev_stored_tp.f1;
>>>>
>>>>              prev_stored_tp.f1 = value;
>>>>
>>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>>
>>>>              prev_tuple.update(prev_stored_tp);
>>>>
>>>>
>>>>
>>>>              Tuple2<String, Double> tp = new Tuple2<String,
>>>> Double>();
>>>>
>>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>>
>>>>              tp.setField(value2, 1);
>>>>
>>>>              out.collect(tp);
>>>>
>>>>
>>>>
>>>>          } catch (NumberFormatException e) {
>>>>
>>>>              System.out.println("Could not convert to Float" +
>>>> incString);
>>>>
>>>>              System.err.println("Could not convert to Float" +
>>>> incString);
>>>>
>>>>          }
>>>>
>>>>      }
>>>>
>>>>
>>>>
>>>>      @Override
>>>>
>>>>      public void open(Configuration config) {
>>>>
>>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>>
>>>>                  new ValueStateDescriptor<>(
>>>>
>>>>                          "previous input value", // the state name
>>>>
>>>>                          TypeInformation.of(new
>>>> TypeHint<Tuple2<String,
>>>> Double>>() {}), // type information
>>>>
>>>>                          Tuple2.of("test topic", 0.0)); // default
>>>> value of the state, if nothing was set
>>>>
>>>>          prev_tuple = getRuntimeContext().getState(descriptor);
>>>>
>>>>      }
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> From: Kostas Kloudas [mailto:[hidden email]]
>>>> Sent: Thursday, August 11, 2016 5:45 AM
>>>> To: [hidden email]
>>>> Subject: Re: flink - Working with State example
>>>>
>>>>
>>>>
>>>> Hello Buvana,
>>>>
>>>>
>>>>
>>>> Can you share a bit more details on your operator and how you are using it?
>>>>
>>>> For example, are you using keyBy before using you custom operator?
>>>>
>>>>
>>>>
>>>> Thanks a lot,
>>>>
>>>> Kostas
>>>>
>>>>
>>>>
>>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>>>> <[hidden email]> wrote:
>>>>
>>>>
>>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I am utilizing the code snippet in:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>>> g /state.html and particularly ‘open’ function in my code:
>>>>
>>>> @Override
>>>>
>>>>  public void open(Configuration config) {
>>>>
>>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>>
>>>>              new ValueStateDescriptor<>(
>>>>
>>>>                      "average", // the state name
>>>>
>>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
>>>> Long>>() {}), // type information
>>>>
>>>>                      Tuple2.of(0L, 0L)); // default value of the
>>>> state, if nothing was set
>>>>
>>>>      sum = getRuntimeContext().getState(descriptor);
>>>>
>>>>  }
>>>>
>>>>
>>>>
>>>> When I run, I get the following error:
>>>>
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:120)
>>>>
>>>>             at
>>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>>
>>>>             at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>>> n
>>>> (FunctionUtils.java:38)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>>> p
>>>> en(AbstractUdfStreamOperator.java:91)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>>> a
>>>> tMap.java:41)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>>> (
>>>> StreamTask.java:314)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>>> k
>>>> .java:214)
>>>>
>>>>             at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>
>>>>             at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.lang.Exception: State key serializer has not been
>>>> configured in the config. This operation cannot use partitioned state.
>>>>
>>>>             at
>>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>>> a
>>>> te(AbstractStateBackend.java:199)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>>> a
>>>> rtitionedState(AbstractStreamOperator.java:260)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:118)
>>>>
>>>>             ... 8 more
>>>>
>>>>
>>>>
>>>> Where do I define the key & value serializer for state?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Buvana
>>>>
>>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink - Working with State example

Aljoscha Krettek
Hi,
you mean the directory is completely empty? Can you check in the JobManager dashboard whether it reports any successful checkpoints for the job? One possible explanation is an optimization that the FsStateBackend performs: when the state is very small it will not actually be written to files but stored in the meta data of the checkpoint that is sent to the JobManager. This would explain why there are no files. You can set the threshold size for this optimization with an additional FsStateBackend constructor parameter, i.e. new FsStateBackend("file:///home/buvana/flink/checkpoints", 0) to disable this optimization.

Cheers,
Aljoscha

On Fri, 12 Aug 2016 at 21:12 Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
Hi Kostas,

I am trying to use FsStateBackend as the backend for storing state. And configure it as follows in the code:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(new FsStateBackend("file:///home/buvana/flink/checkpoints"));
        env.enableCheckpointing(10000);

everything else is same as the code I shared with you previously.

When I execute, I see that a directory is created under /home/buvana/flink/checkpoints, but there is nothing under that directory.
I was expecting to find some file / sub dir there.

Please explain.

Thanks,
Buvana

-----Original Message-----
From: Kostas Kloudas [mailto:[hidden email]]
Sent: Friday, August 12, 2016 1:37 AM
To: [hidden email]
Subject: Re: flink - Working with State example

No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:[hidden email]]
> Sent: Thursday, August 11, 2016 11:22 AM
> To: [hidden email]
> Subject: Re: flink - Working with State example
>
> Hi Buvana,
>
> At a first glance, your snapshotState() should return a Double.
>
> Kostas
>
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <[hidden email]> wrote:
>>
>> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>>
>> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>>
>> Thanks again,
>> Buvana
>>
>> ======================================================================
>> ==========================================
>> Code:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>>
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>>
>> import java.util.Properties;
>>
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name
>> change -------------
>> ---
>>      public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>>           implements Checkpointed<Double> {
>>
>>       private Double prev_tuple = null;
>>
>>       @Override
>>       public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>>           try {
>>               Double value = Double.parseDouble(incString);
>>               System.out.println("value = " + value);
>>               System.out.println(prev_tuple);
>>
>>               Double value2 = value - prev_tuple;
>>               prev_tuple = value;
>>
>>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>               tp.setField(value2, 1);
>>               out.collect(tp);
>>           } catch (NumberFormatException e) {
>>               System.out.println("Could not convert to Float" + incString);
>>               System.err.println("Could not convert to Float" + incString);
>>           }
>>       }
>>       @Override
>>       public void open(Configuration config) {
>>           if (prev_tuple == null) {
>>               // only recreate if null
>>               // restoreState will be called before open()
>>               // so this will already set the sum to the restored value
>>               prev_tuple = new Double("0.0");
>>           }
>>       }
>>
>>       @Override
>>       public Serializable snapshotState(
>>               long checkpointId,
>>               long checkpointTimestamp) throws Exception {
>>           return prev_tuple;
>>       }
>>
>>
>>       @Override
>>       public void restoreState(Double state) {
>>           prev_tuple = state;
>>       }
>>   }
>> }
>> ======================================================================
>> =========================================
>> ERROR message while building:
>>
>> $ mvn clean package
>> [INFO] Scanning for projects...
>> [INFO]
>> [INFO]
>> ----------------------------------------------------------------------
>> -- [INFO] Building Flink Quickstart Job 0.1 [INFO]
>> ----------------------------------------------------------------------
>> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has
>> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] ---
>> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO]
>> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
>> [INFO]
>> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
>> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] Copying 1 resource
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
>> wiki-edits --- [INFO] Changes detected - recompiling the module!
>> [INFO] Compiling 7 source files to
>> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
>> [INFO] -------------------------------------------------------------
>> [ERROR] COMPILATION ERROR :
>> [INFO] -------------------------------------------------------------
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
>> does not override abstract method snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in
>> wikiedits.stateful.MapStateful cannot implement
>> snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> return type java.io.Serializable is not compatible with
>> java.lang.Double [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from
>> a supertype [INFO] 3 errors [INFO]
>> -------------------------------------------------------------
>> [INFO]
>> ----------------------------------------------------------------------
>> --
>> [INFO] BUILD FAILURE
>> [INFO]
>> ----------------------------------------------------------------------
>> --
>> [INFO] Total time: 2.171s
>> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory:
>> 26M/660M [INFO]
>> ----------------------------------------------------------------------
>> -- [ERROR] Failed to execute goal
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
>> does not override abstract method snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in
>> wikiedits.stateful.MapStateful cannot implement
>> snapshotState(long,long) in
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] return type java.io.Serializable is not compatible with
>> java.lang.Double [ERROR]
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from
>> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions, please read the following articles:
>> [ERROR] [Help 1]
>> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>> ======================================================================
>> ==========================================
>>
>> -----Original Message-----
>> From: Kostas Kloudas [mailto:[hidden email]]
>> Sent: Thursday, August 11, 2016 10:34 AM
>> To: [hidden email]
>> Subject: Re: flink - Working with State example
>>
>> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>>
>> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>>
>> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.
>>
>> Let us know if you need anything else.
>>
>> Kostas
>>
>>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> This only works for keyed streams, you have to use keyBy().
>>>
>>> You can use the Checkpointed interface instead
>>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>>
>>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>>> <[hidden email]> wrote:
>>>> Hi Kostas,
>>>>
>>>>
>>>>
>>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]),
>>>> where x[t] is the current value of the incoming sample and x[t-1] is
>>>> the previous value of the incoming sample. I store the current value
>>>> in state store
>>>> (‘prev_tuple’) so that I can use it for computation in next cycle.
>>>> As you may observe, I am not using keyBy. I am simply printing out
>>>> the resultant tuple.
>>>>
>>>>
>>>>
>>>> It appears from the error message that I have to set the key
>>>> serializer (and possibly value serializer) for the state store. I am
>>>> not sure how to do that…
>>>>
>>>>
>>>>
>>>> Thanks for your interest in helping,
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Buvana
>>>>
>>>>
>>>>
>>>> public class stateful {
>>>>
>>>>  private static String INPUT_KAFKA_TOPIC = null;
>>>>
>>>>  private static int TIME_WINDOW = 0;
>>>>
>>>>
>>>>
>>>>  public static void main(String[] args) throws Exception {
>>>>
>>>>
>>>>
>>>>      if (args.length < 2) {
>>>>
>>>>          throw new IllegalArgumentException("The application needs
>>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>>
>>>>                  + "fetch the data. The second argument is the size
>>>> of the window, in seconds, to which the aggregation function must be applied.
>>>> \n");
>>>>
>>>>      }
>>>>
>>>>
>>>>
>>>>      INPUT_KAFKA_TOPIC = args[0];
>>>>
>>>>      TIME_WINDOW = Integer.parseInt(args[1]);
>>>>
>>>>
>>>>
>>>>      Properties properties = null;
>>>>
>>>>
>>>>
>>>>      properties = new Properties();
>>>>
>>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>
>>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
>>>>
>>>>      properties.setProperty("group.id", "test");
>>>>
>>>>
>>>>
>>>>      StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>      //env.setStateBackend(new
>>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>>
>>>>
>>>>
>>>>      DataStreamSource<String> stream = env
>>>>
>>>>              .addSource(new
>>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>>>> properties));
>>>>
>>>>
>>>>
>>>>      // maps the data into Flink tuples
>>>>
>>>>      DataStream<Tuple2<String,Double>> streamTuples =
>>>> stream.flatMap(new Rec2Tuple2());
>>>>
>>>>
>>>>
>>>>      // write the result to the console or in a Kafka topic
>>>>
>>>>      streamTuples.print();
>>>>
>>>>
>>>>
>>>>      env.execute("plus one");
>>>>
>>>>
>>>>
>>>>  }
>>>>
>>>>
>>>>
>>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>>>> Tuple2<String,Double> > {
>>>>
>>>>      private transient ValueState<Tuple2<String, Double>>
>>>> prev_tuple;
>>>>
>>>>
>>>>
>>>>      @Override
>>>>
>>>>      public void flatMap(String incString, Collector<Tuple2<String,
>>>> Double>> out) throws Exception {
>>>>
>>>>          try {
>>>>
>>>>              Double value = Double.parseDouble(incString);
>>>>
>>>>              System.out.println("value = " + value);
>>>>
>>>>              Tuple2<String, Double> prev_stored_tp =
>>>> prev_tuple.value();
>>>>
>>>>              System.out.println(prev_stored_tp);
>>>>
>>>>
>>>>
>>>>              Double value2 = value - prev_stored_tp.f1;
>>>>
>>>>              prev_stored_tp.f1 = value;
>>>>
>>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>>
>>>>              prev_tuple.update(prev_stored_tp);
>>>>
>>>>
>>>>
>>>>              Tuple2<String, Double> tp = new Tuple2<String,
>>>> Double>();
>>>>
>>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>>
>>>>              tp.setField(value2, 1);
>>>>
>>>>              out.collect(tp);
>>>>
>>>>
>>>>
>>>>          } catch (NumberFormatException e) {
>>>>
>>>>              System.out.println("Could not convert to Float" +
>>>> incString);
>>>>
>>>>              System.err.println("Could not convert to Float" +
>>>> incString);
>>>>
>>>>          }
>>>>
>>>>      }
>>>>
>>>>
>>>>
>>>>      @Override
>>>>
>>>>      public void open(Configuration config) {
>>>>
>>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>>
>>>>                  new ValueStateDescriptor<>(
>>>>
>>>>                          "previous input value", // the state name
>>>>
>>>>                          TypeInformation.of(new
>>>> TypeHint<Tuple2<String,
>>>> Double>>() {}), // type information
>>>>
>>>>                          Tuple2.of("test topic", 0.0)); // default
>>>> value of the state, if nothing was set
>>>>
>>>>          prev_tuple = getRuntimeContext().getState(descriptor);
>>>>
>>>>      }
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> From: Kostas Kloudas [mailto:[hidden email]]
>>>> Sent: Thursday, August 11, 2016 5:45 AM
>>>> To: [hidden email]
>>>> Subject: Re: flink - Working with State example
>>>>
>>>>
>>>>
>>>> Hello Buvana,
>>>>
>>>>
>>>>
>>>> Can you share a bit more details on your operator and how you are using it?
>>>>
>>>> For example, are you using keyBy before using you custom operator?
>>>>
>>>>
>>>>
>>>> Thanks a lot,
>>>>
>>>> Kostas
>>>>
>>>>
>>>>
>>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>>>> <[hidden email]> wrote:
>>>>
>>>>
>>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I am utilizing the code snippet in:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>>> g /state.html and particularly ‘open’ function in my code:
>>>>
>>>> @Override
>>>>
>>>>  public void open(Configuration config) {
>>>>
>>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>>
>>>>              new ValueStateDescriptor<>(
>>>>
>>>>                      "average", // the state name
>>>>
>>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
>>>> Long>>() {}), // type information
>>>>
>>>>                      Tuple2.of(0L, 0L)); // default value of the
>>>> state, if nothing was set
>>>>
>>>>      sum = getRuntimeContext().getState(descriptor);
>>>>
>>>>  }
>>>>
>>>>
>>>>
>>>> When I run, I get the following error:
>>>>
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:120)
>>>>
>>>>             at
>>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>>
>>>>             at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>>> n
>>>> (FunctionUtils.java:38)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>>> p
>>>> en(AbstractUdfStreamOperator.java:91)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>>> a
>>>> tMap.java:41)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>>> (
>>>> StreamTask.java:314)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>>> k
>>>> .java:214)
>>>>
>>>>             at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>
>>>>             at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: java.lang.Exception: State key serializer has not been
>>>> configured in the config. This operation cannot use partitioned state.
>>>>
>>>>             at
>>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>>> a
>>>> te(AbstractStateBackend.java:199)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>>> a
>>>> rtitionedState(AbstractStreamOperator.java:260)
>>>>
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:118)
>>>>
>>>>             ... 8 more
>>>>
>>>>
>>>>
>>>> Where do I define the key & value serializer for state?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Buvana
>>>>
>>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink no class found error

rmetzger0
In reply to this post by Janardhan Reddy
Hi,
this page explains how to relocate classes in a fat jar: https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html

Regards,
Robert


On Wed, Aug 10, 2016 at 10:31 PM, Janardhan Reddy <[hidden email]> wrote:
We don't use guava directly, we use another library which uses guava internally? How do we use shade plugin in this case.

On Thu, Aug 11, 2016 at 1:37 AM, Janardhan Reddy <[hidden email]> wrote:
I have cross checked that all our yarn nodes have 1.8 java installed but still we are getting the error : Unsupported major.minor version 52.0

On Thu, Aug 11, 2016 at 1:35 AM, Janardhan Reddy <[hidden email]> wrote:
can you please explain a bit more about last option. We are using yarn so guava might be in some classpath.

On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <[hidden email]> wrote:
Can you check if the jar you are submitting to the cluster contains a different Guava than you use at compile time?

Also, it might happen that Guava is in your classpath, for example one some YARN setups.

The last resort to resolve these issues is to use the maven-shade-plugin and relocated the guava version you need into your own namespace.

On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <[hidden email]> wrote:
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <[hidden email]> wrote:
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <[hidden email]> wrote:
Hi,

We are getting the following error on submitting the flink jobs to the cluster.

1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError: com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the jobs.