Queryable State

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State

Darshan Singh
Hi All,

I was playing with queryable state. As queryable stream can not be modified how do I use the output of say my reduce function for further processing.

Below is 1 example. I am sure I have done it wrong :). I am using reduce function twice or do I need to use rich reduce function and use the queryable state there.

Thanks

public class reducetest {

public static void main(String[] args) throws Exception {


// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor =
new ReducingStateDescriptor<Tuple2<Integer, Integer>>(
"sum", // the state name
new rf(),
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})); // type information
descriptor.setQueryable("reduce");

DataStream<Tuple2<Integer, Integer>> ds1 = env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer integer) throws Exception {
return Tuple2.of(integer%2,integer);
}
}).keyBy(0)
;

((KeyedStream) ds1).asQueryableState("reduce", descriptor).getStateDescriptor();


DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce( new rf());

//ds1.print();
ds2.print();

System.out.println(env.getExecutionPlan());

env.execute("re");
}



static class rf implements ReduceFunction<Tuple2<Integer, Integer>> {

@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> e, Tuple2<Integer, Integer> n) throws Exception {
return Tuple2.of(e.f0, e.f1 + n.f1);

}
}

}
Reply | Threaded
Open this post in threaded view
|

Queryable State

Darshan Singh
Hi All,

I was playing with queryable state. As queryable stream can not be modified how do I use the output of say my reduce function for further processing.

Below is 1 example. I can see that I am processing the data twice. One for the Queryable stream and once for the my original stream. That means state will be kept twice as well?

In simple term I would like to query the state from rf reduce function and I would like my stream to be written to Kafka. If I use like below I am able to do so but it seems to me that there is duplicate state storing as well as duplicate work is being done.

Is there any alternate for what I am trying to achieve this?

Thanks

public class reducetest {

public static void main(String[] args) throws Exception {


// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor =
new ReducingStateDescriptor<Tuple2<Integer, Integer>>(
"sum", // the state name
new rf(),
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})); // type information
descriptor.setQueryable("reduce");

DataStream<Tuple2<Integer, Integer>> ds1 = env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer integer) throws Exception {
return Tuple2.of(integer%2,integer);
}
}).keyBy(0)
;

((KeyedStream) ds1).asQueryableState("reduce", descriptor);


DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce(
descriptor.getReduceFunction());

//ds1.print();
ds2.print();

System.out.println(env.getExecutionPlan());

env.execute("re");
}



static class rf implements ReduceFunction<Tuple2<Integer, Integer>> {

@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> e, Tuple2<Integer, Integer> n) throws Exception {
return Tuple2.of(e.f0, e.f1 + n.f1);

}
}

}
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State

Sameer Wadkar
I have used connected streams where one part of the connected stream maintains state and the other part consumes it. 

However it was not queryable externally. For state that is queryable externally you are right you probably need another operator to store state and support query-ability. 

Sent from my iPhone

On Sep 5, 2018, at 7:26 AM, Darshan Singh <[hidden email]> wrote:

Hi All,

I was playing with queryable state. As queryable stream can not be modified how do I use the output of say my reduce function for further processing.

Below is 1 example. I can see that I am processing the data twice. One for the Queryable stream and once for the my original stream. That means state will be kept twice as well?

In simple term I would like to query the state from rf reduce function and I would like my stream to be written to Kafka. If I use like below I am able to do so but it seems to me that there is duplicate state storing as well as duplicate work is being done.

Is there any alternate for what I am trying to achieve this?

Thanks

public class reducetest {

public static void main(String[] args) throws Exception {


// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor =
new ReducingStateDescriptor<Tuple2<Integer, Integer>>(
"sum", // the state name
new rf(),
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})); // type information
descriptor.setQueryable("reduce");

DataStream<Tuple2<Integer, Integer>> ds1 = env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer integer) throws Exception {
return Tuple2.of(integer%2,integer);
}
}).keyBy(0)
;

((KeyedStream) ds1).asQueryableState("reduce", descriptor);


DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce(
descriptor.getReduceFunction());

//ds1.print();
ds2.print();

System.out.println(env.getExecutionPlan());

env.execute("re");
}



static class rf implements ReduceFunction<Tuple2<Integer, Integer>> {

@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> e, Tuple2<Integer, Integer> n) throws Exception {
return Tuple2.of(e.f0, e.f1 + n.f1);

}
}

}