http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/flink-no-class-found-error-tp8432p8474.html
> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <
[hidden email]> wrote:
>
> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>
> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>
> Thanks again,
> Buvana
>
> ======================================================================
> ==========================================
> Code:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import
> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
> private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name
> change -------------
> ---
> public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
> implements Checkpointed<Double> {
>
> private Double prev_tuple = null;
>
> @Override
> public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
> try {
> Double value = Double.parseDouble(incString);
> System.out.println("value = " + value);
> System.out.println(prev_tuple);
>
> Double value2 = value - prev_tuple;
> prev_tuple = value;
>
> Tuple2<String, Double> tp = new Tuple2<String, Double>();
> tp.setField(INPUT_KAFKA_TOPIC, 0);
> tp.setField(value2, 1);
> out.collect(tp);
> } catch (NumberFormatException e) {
> System.out.println("Could not convert to Float" + incString);
> System.err.println("Could not convert to Float" + incString);
> }
> }
> @Override
> public void open(Configuration config) {
> if (prev_tuple == null) {
> // only recreate if null
> // restoreState will be called before open()
> // so this will already set the sum to the restored value
> prev_tuple = new Double("0.0");
> }
> }
>
> @Override
> public Serializable snapshotState(
> long checkpointId,
> long checkpointTimestamp) throws Exception {
> return prev_tuple;
> }
>
>
> @Override
> public void restoreState(Double state) {
> prev_tuple = state;
> }
> }
> }
> ======================================================================
> =========================================
> ERROR message while building:
>
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]
> [INFO]
> ----------------------------------------------------------------------
> -- [INFO] Building Flink Quickstart Job 0.1 [INFO]
> ----------------------------------------------------------------------
> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has
> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] ---
> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO]
> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO]
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
> wiki-edits --- [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to
> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR :
> [INFO] -------------------------------------------------------------
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
> does not override abstract method snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[151,29] snapshotState(long,long) in
> wikiedits.stateful.MapStateful cannot implement
> snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> return type java.io.Serializable is not compatible with
> java.lang.Double [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[150,9] method does not override or implement a method from
> a supertype [INFO] 3 errors [INFO]
> -------------------------------------------------------------
> [INFO]
> ----------------------------------------------------------------------
> --
> [INFO] BUILD FAILURE
> [INFO]
> ----------------------------------------------------------------------
> --
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory:
> 26M/660M [INFO]
> ----------------------------------------------------------------------
> -- [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
> does not override abstract method snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[151,29] snapshotState(long,long) in
> wikiedits.stateful.MapStateful cannot implement
> snapshotState(long,long) in
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with
> java.lang.Double [ERROR]
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[150,9] method does not override or implement a method from
> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please read the following articles:
> [ERROR] [Help 1]
>
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException> ======================================================================
> ==========================================
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:
[hidden email]]
> Sent: Thursday, August 11, 2016 10:34 AM
> To:
[hidden email]
> Subject: Re: flink - Working with State example
>
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>
> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>
> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead.
>
> Let us know if you need anything else.
>
> Kostas
>
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <
[hidden email]> wrote:
>>
>> This only works for keyed streams, you have to use keyBy().
>>
>> You can use the Checkpointed interface instead
>> (
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
>> <
[hidden email]> wrote:
>>> Hi Kostas,
>>>
>>>
>>>
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]),
>>> where x[t] is the current value of the incoming sample and x[t-1] is
>>> the previous value of the incoming sample. I store the current value
>>> in state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle.
>>> As you may observe, I am not using keyBy. I am simply printing out
>>> the resultant tuple.
>>>
>>>
>>>
>>> It appears from the error message that I have to set the key
>>> serializer (and possibly value serializer) for the state store. I am
>>> not sure how to do that…
>>>
>>>
>>>
>>> Thanks for your interest in helping,
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>> Buvana
>>>
>>>
>>>
>>> public class stateful {
>>>
>>> private static String INPUT_KAFKA_TOPIC = null;
>>>
>>> private static int TIME_WINDOW = 0;
>>>
>>>
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>>
>>> if (args.length < 2) {
>>>
>>> throw new IllegalArgumentException("The application needs
>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>
>>> + "fetch the data. The second argument is the size
>>> of the window, in seconds, to which the aggregation function must be applied.
>>> \n");
>>>
>>> }
>>>
>>>
>>>
>>> INPUT_KAFKA_TOPIC = args[0];
>>>
>>> TIME_WINDOW = Integer.parseInt(args[1]);
>>>
>>>
>>>
>>> Properties properties = null;
>>>
>>>
>>>
>>> properties = new Properties();
>>>
>>> properties.setProperty("bootstrap.servers", "localhost:9092");
>>>
>>> properties.setProperty("zookeeper.connect", "localhost:2181");
>>>
>>> properties.setProperty("group.id", "test");
>>>
>>>
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> //env.setStateBackend(new
>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>
>>>
>>>
>>> DataStreamSource<String> stream = env
>>>
>>> .addSource(new
>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
>>> properties));
>>>
>>>
>>>
>>> // maps the data into Flink tuples
>>>
>>> DataStream<Tuple2<String,Double>> streamTuples =
>>> stream.flatMap(new Rec2Tuple2());
>>>
>>>
>>>
>>> // write the result to the console or in a Kafka topic
>>>
>>> streamTuples.print();
>>>
>>>
>>>
>>> env.execute("plus one");
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>>> Tuple2<String,Double> > {
>>>
>>> private transient ValueState<Tuple2<String, Double>>
>>> prev_tuple;
>>>
>>>
>>>
>>> @Override
>>>
>>> public void flatMap(String incString, Collector<Tuple2<String,
>>> Double>> out) throws Exception {
>>>
>>> try {
>>>
>>> Double value = Double.parseDouble(incString);
>>>
>>> System.out.println("value = " + value);
>>>
>>> Tuple2<String, Double> prev_stored_tp =
>>> prev_tuple.value();
>>>
>>> System.out.println(prev_stored_tp);
>>>
>>>
>>>
>>> Double value2 = value - prev_stored_tp.f1;
>>>
>>> prev_stored_tp.f1 = value;
>>>
>>> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>
>>> prev_tuple.update(prev_stored_tp);
>>>
>>>
>>>
>>> Tuple2<String, Double> tp = new Tuple2<String,
>>> Double>();
>>>
>>> tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>
>>> tp.setField(value2, 1);
>>>
>>> out.collect(tp);
>>>
>>>
>>>
>>> } catch (NumberFormatException e) {
>>>
>>> System.out.println("Could not convert to Float" +
>>> incString);
>>>
>>> System.err.println("Could not convert to Float" +
>>> incString);
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> @Override
>>>
>>> public void open(Configuration config) {
>>>
>>> ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>
>>> new ValueStateDescriptor<>(
>>>
>>> "previous input value", // the state name
>>>
>>> TypeInformation.of(new
>>> TypeHint<Tuple2<String,
>>> Double>>() {}), // type information
>>>
>>> Tuple2.of("test topic", 0.0)); // default
>>> value of the state, if nothing was set
>>>
>>> prev_tuple = getRuntimeContext().getState(descriptor);
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> From: Kostas Kloudas [mailto:
[hidden email]]
>>> Sent: Thursday, August 11, 2016 5:45 AM
>>> To:
[hidden email]
>>> Subject: Re: flink - Working with State example
>>>
>>>
>>>
>>> Hello Buvana,
>>>
>>>
>>>
>>> Can you share a bit more details on your operator and how you are using it?
>>>
>>> For example, are you using keyBy before using you custom operator?
>>>
>>>
>>>
>>> Thanks a lot,
>>>
>>> Kostas
>>>
>>>
>>>
>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>>> <
[hidden email]> wrote:
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I am utilizing the code snippet in:
>>>
https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin>>> g /state.html and particularly ‘open’ function in my code:
>>>
>>> @Override
>>>
>>> public void open(Configuration config) {
>>>
>>> ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>
>>> new ValueStateDescriptor<>(
>>>
>>> "average", // the state name
>>>
>>> TypeInformation.of(new TypeHint<Tuple2<Long,
>>> Long>>() {}), // type information
>>>
>>> Tuple2.of(0L, 0L)); // default value of the
>>> state, if nothing was set
>>>
>>> sum = getRuntimeContext().getState(descriptor);
>>>
>>> }
>>>
>>>
>>>
>>> When I run, I get the following error:
>>>
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>> S
>>> tate(StreamingRuntimeContext.java:120)
>>>
>>> at
>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>> n
>>> (FunctionUtils.java:38)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>> p
>>> en(AbstractUdfStreamOperator.java:91)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>> a
>>> tMap.java:41)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>> (
>>> StreamTask.java:314)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>> k
>>> .java:214)
>>>
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.Exception: State key serializer has not been
>>> configured in the config. This operation cannot use partitioned state.
>>>
>>> at
>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>> a
>>> te(AbstractStateBackend.java:199)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>> a
>>> rtitionedState(AbstractStreamOperator.java:260)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>> S
>>> tate(StreamingRuntimeContext.java:118)
>>>
>>> ... 8 more
>>>
>>>
>>>
>>> Where do I define the key & value serializer for state?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Buvana
>>>
>>>
>