Re: flink - Working with State example

Posted by Kostas Kloudas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/flink-no-class-found-error-tp8432p8465.html

Exactly as Ufuk suggested, if you are not grouping your stream by key,
you should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the one that
implicitly sets the keySerializer and scopes your (keyed) state to a specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed interface
should be used instead.

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <[hidden email]> wrote:
>
> This only works for keyed streams, you have to use keyBy().
>
> You can use the Checkpointed interface instead
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
> <[hidden email]> wrote:
>> Hi Kostas,
>>
>>
>>
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
>> is the current value of the incoming sample and x[t-1] is the previous value
>> of the incoming sample. I store the current value in state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
>> may observe, I am not using keyBy. I am simply printing out the resultant
>> tuple.
>>
>>
>>
>> It appears from the error message that I have to set the key serializer (and
>> possibly value serializer) for the state store. I am not sure how to do
>> that…
>>
>>
>>
>> Thanks for your interest in helping,
>>
>>
>>
>>
>>
>> Regards,
>>
>> Buvana
>>
>>
>>
>> public class stateful {
>>
>>    private static String INPUT_KAFKA_TOPIC = null;
>>
>>    private static int TIME_WINDOW = 0;
>>
>>
>>
>>    public static void main(String[] args) throws Exception {
>>
>>
>>
>>        if (args.length < 2) {
>>
>>            throw new IllegalArgumentException("The application needs two
>> arguments. The first is the name of the kafka topic from which it has to \n"
>>
>>                    + "fetch the data. The second argument is the size of
>> the window, in seconds, to which the aggregation function must be applied.
>> \n");
>>
>>        }
>>
>>
>>
>>        INPUT_KAFKA_TOPIC = args[0];
>>
>>        TIME_WINDOW = Integer.parseInt(args[1]);
>>
>>
>>
>>        Properties properties = null;
>>
>>
>>
>>        properties = new Properties();
>>
>>        properties.setProperty("bootstrap.servers", "localhost:9092");
>>
>>        properties.setProperty("zookeeper.connect", "localhost:2181");
>>
>>        properties.setProperty("group.id", "test");
>>
>>
>>
>>        StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>        //env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>
>>
>>
>>        DataStreamSource<String> stream = env
>>
>>                .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
>> SimpleStringSchema(), properties));
>>
>>
>>
>>        // maps the data into Flink tuples
>>
>>        DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
>> Rec2Tuple2());
>>
>>
>>
>>        // write the result to the console or in a Kafka topic
>>
>>        streamTuples.print();
>>
>>
>>
>>        env.execute("plus one");
>>
>>
>>
>>    }
>>
>>
>>
>>    public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>> Tuple2<String,Double> > {
>>
>>        private transient ValueState<Tuple2<String, Double>> prev_tuple;
>>
>>
>>
>>        @Override
>>
>>        public void flatMap(String incString, Collector<Tuple2<String,
>> Double>> out) throws Exception {
>>
>>            try {
>>
>>                Double value = Double.parseDouble(incString);
>>
>>                System.out.println("value = " + value);
>>
>>                Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>>
>>                System.out.println(prev_stored_tp);
>>
>>
>>
>>                Double value2 = value - prev_stored_tp.f1;
>>
>>                prev_stored_tp.f1 = value;
>>
>>                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>
>>                prev_tuple.update(prev_stored_tp);
>>
>>
>>
>>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>
>>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>>
>>                tp.setField(value2, 1);
>>
>>                out.collect(tp);
>>
>>
>>
>>            } catch (NumberFormatException e) {
>>
>>                System.out.println("Could not convert to Float" +
>> incString);
>>
>>                System.err.println("Could not convert to Float" +
>> incString);
>>
>>            }
>>
>>        }
>>
>>
>>
>>        @Override
>>
>>        public void open(Configuration config) {
>>
>>            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>
>>                    new ValueStateDescriptor<>(
>>
>>                            "previous input value", // the state name
>>
>>                            TypeInformation.of(new TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>>
>>                            Tuple2.of("test topic", 0.0)); // default value
>> of the state, if nothing was set
>>
>>            prev_tuple = getRuntimeContext().getState(descriptor);
>>
>>        }
>>
>>    }
>>
>> }
>>
>>
>>
>> From: Kostas Kloudas [mailto:[hidden email]]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: [hidden email]
>> Subject: Re: flink - Working with State example
>>
>>
>>
>> Hello Buvana,
>>
>>
>>
>> Can you share a bit more details on your operator and how you are using it?
>>
>> For example, are you using keyBy before using you custom operator?
>>
>>
>>
>> Thanks a lot,
>>
>> Kostas
>>
>>
>>
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>> <[hidden email]> wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
>> and particularly ‘open’ function in my code:
>>
>> @Override
>>
>>    public void open(Configuration config) {
>>
>>        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>
>>                new ValueStateDescriptor<>(
>>
>>                        "average", // the state name
>>
>>                        TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>>
>>                        Tuple2.of(0L, 0L)); // default value of the state,
>> if nothing was set
>>
>>        sum = getRuntimeContext().getState(descriptor);
>>
>>    }
>>
>>
>>
>> When I run, I get the following error:
>>
>> Caused by: java.lang.RuntimeException: Error while getting state
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>>
>>               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>
>>               at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>
>>               at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>>
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>>
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>>
>>               at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>
>>               at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.Exception: State key serializer has not been configured
>> in the config. This operation cannot use partitioned state.
>>
>>               at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>>
>>               at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>>
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>>
>>               ... 8 more
>>
>>
>>
>> Where do I define the key & value serializer for state?
>>
>>
>>
>> Thanks,
>>
>> Buvana
>>
>>