How to maintain variable for each map operator

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to maintain variable for each map operator

ZalaCheung
Hi all,

I am stuck with a problem. I have a stream, I want keyby it and then do a map function on it.  But for each map operator, I want to maintain a variable for it. Is that possible? I tried a naive version on local IntelliJ IDE and it works. But I got nullpointerException while trying to run it on a cluster.

Here is the pseudo code for my naive version, wish it will help you guys understand my question.

public class anomalydetection{
   private static List<TimeSeries> queue;
   public static void main(String[] args)throws Exception{
        initialize();
        getStreamExecutionEnvironment();
        DataStream input = ...
        stream.keyby("some key").map(
            MapFunction(){
                if(queue.size() < some_num){
                    queue.add()
                    //do something
                }
                else{
                    //dosomething
                }
            }
        )
    
    public void initialize(){
        queue = new ArrayList<>();
    }
}

when I try to get the size of the arraylist, I get a NullPointerException. Beside that, I want  to maintain a list for each map operator after I group the stream by some key.


Is that possible to do what I want in Flink? 

Desheng Zhang

E-mail: [hidden email];

Reply | Threaded
Open this post in threaded view
|

Re: How to maintain variable for each map operator

Kurt Young

Best,
Kurt

On Thu, Jul 13, 2017 at 1:14 PM, ZalaCheung <[hidden email]> wrote:
Hi all,

I am stuck with a problem. I have a stream, I want keyby it and then do a map function on it.  But for each map operator, I want to maintain a variable for it. Is that possible? I tried a naive version on local IntelliJ IDE and it works. But I got nullpointerException while trying to run it on a cluster.

Here is the pseudo code for my naive version, wish it will help you guys understand my question.

public class anomalydetection{
   private static List<TimeSeries> queue;
   public static void main(String[] args)throws Exception{
        initialize();
        getStreamExecutionEnvironment();
        DataStream input = ...
        stream.keyby("some key").map(
            MapFunction(){
                if(queue.size() < some_num){
                    queue.add()
                    //do something
                }
                else{
                    //dosomething
                }
            }
        )
    
    public void initialize(){
        queue = new ArrayList<>();
    }
}

when I try to get the size of the arraylist, I get a NullPointerException. Beside that, I want  to maintain a list for each map operator after I group the stream by some key.


Is that possible to do what I want in Flink? 

Desheng Zhang

E-mail: [hidden email];


Reply | Threaded
Open this post in threaded view
|

Re: How to maintain variable for each map operator

ZalaCheung
Hi Kurt,

Thanks! Your link helps me a lot.

I still have some problems after I glance on the document. As you can see from my first email, I tried to implement a mapfunction class in flink. I actually have 3 arraylists to be maintain at this map operator. I think the  Using managed keyed state part of the document fits my requirement. But I am still confused about how to maintain my ListStates

1. Does each Array List has its own ListState? 
2. I am not clear with the open function on the example given by Flink. I wonder how I can initialize my arraylists with ListStateDescriptor



Desheng Zhang
E-mail: [hidden email];

On Jul 13, 2017, at 13:23, Kurt Young <[hidden email]> wrote:


Best,
Kurt

On Thu, Jul 13, 2017 at 1:14 PM, ZalaCheung <[hidden email]> wrote:
Hi all,

I am stuck with a problem. I have a stream, I want keyby it and then do a map function on it.  But for each map operator, I want to maintain a variable for it. Is that possible? I tried a naive version on local IntelliJ IDE and it works. But I got nullpointerException while trying to run it on a cluster.

Here is the pseudo code for my naive version, wish it will help you guys understand my question.

public class anomalydetection{
   private static List<TimeSeries> queue;
   public static void main(String[] args)throws Exception{
        initialize();
        getStreamExecutionEnvironment();
        DataStream input = ...
        stream.keyby("some key").map(
            MapFunction(){
                if(queue.size() < some_num){
                    queue.add()
                    //do something
                }
                else{
                    //dosomething
                }
            }
        )
    
    public void initialize(){
        queue = new ArrayList<>();
    }
}

when I try to get the size of the arraylist, I get a NullPointerException. Beside that, I want  to maintain a list for each map operator after I group the stream by some key.


Is that possible to do what I want in Flink? 

Desheng Zhang

E-mail: [hidden email];



Reply | Threaded
Open this post in threaded view
|

Re: How to maintain variable for each map operator

Kurt Young
Hi,

Regarding 1. State is some kind of value bound with your current key of KeyedStream. ListState is list like state, it can be used as a List, you can add value to it, and get a iterator from it. If you have multiple ArrayList to maintain, you can declare multiple states, each with different names. 

Regarding 2. You should change your MapFunction to RichMapFunction to be able to declare and use states. 

Best,
Kurt

On Thu, Jul 13, 2017 at 2:22 PM, ZalaCheung <[hidden email]> wrote:
Hi Kurt,

Thanks! Your link helps me a lot.

I still have some problems after I glance on the document. As you can see from my first email, I tried to implement a mapfunction class in flink. I actually have 3 arraylists to be maintain at this map operator. I think the  Using managed keyed state part of the document fits my requirement. But I am still confused about how to maintain my ListStates

1. Does each Array List has its own ListState? 
2. I am not clear with the open function on the example given by Flink. I wonder how I can initialize my arraylists with ListStateDescriptor



Desheng Zhang
E-mail: [hidden email];

On Jul 13, 2017, at 13:23, Kurt Young <[hidden email]> wrote:


Best,
Kurt

On Thu, Jul 13, 2017 at 1:14 PM, ZalaCheung <[hidden email]> wrote:
Hi all,

I am stuck with a problem. I have a stream, I want keyby it and then do a map function on it.  But for each map operator, I want to maintain a variable for it. Is that possible? I tried a naive version on local IntelliJ IDE and it works. But I got nullpointerException while trying to run it on a cluster.

Here is the pseudo code for my naive version, wish it will help you guys understand my question.

public class anomalydetection{
   private static List<TimeSeries> queue;
   public static void main(String[] args)throws Exception{
        initialize();
        getStreamExecutionEnvironment();
        DataStream input = ...
        stream.keyby("some key").map(
            MapFunction(){
                if(queue.size() < some_num){
                    queue.add()
                    //do something
                }
                else{
                    //dosomething
                }
            }
        )
    
    public void initialize(){
        queue = new ArrayList<>();
    }
}

when I try to get the size of the arraylist, I get a NullPointerException. Beside that, I want  to maintain a list for each map operator after I group the stream by some key.


Is that possible to do what I want in Flink? 

Desheng Zhang

E-mail: [hidden email];




Reply | Threaded
Open this post in threaded view
|

Re: How to maintain variable for each map operator

ZalaCheung
HI Kurt,

Thanks for you reply! I’ve already solved the problem!





Desheng Zhang

E-mail: [hidden email];

On Jul 13, 2017, at 17:10, Kurt Young <[hidden email]> wrote:

Hi,

Regarding 1. State is some kind of value bound with your current key of KeyedStream. ListState is list like state, it can be used as a List, you can add value to it, and get a iterator from it. If you have multiple ArrayList to maintain, you can declare multiple states, each with different names. 

Regarding 2. You should change your MapFunction to RichMapFunction to be able to declare and use states. 

Best,
Kurt

On Thu, Jul 13, 2017 at 2:22 PM, ZalaCheung <[hidden email]> wrote:
Hi Kurt,

Thanks! Your link helps me a lot.

I still have some problems after I glance on the document. As you can see from my first email, I tried to implement a mapfunction class in flink. I actually have 3 arraylists to be maintain at this map operator. I think the  Using managed keyed state part of the document fits my requirement. But I am still confused about how to maintain my ListStates

1. Does each Array List has its own ListState? 
2. I am not clear with the open function on the example given by Flink. I wonder how I can initialize my arraylists with ListStateDescriptor



Desheng Zhang
E-mail: [hidden email];

On Jul 13, 2017, at 13:23, Kurt Young <[hidden email]> wrote:


Best,
Kurt

On Thu, Jul 13, 2017 at 1:14 PM, ZalaCheung <[hidden email]> wrote:
Hi all,

I am stuck with a problem. I have a stream, I want keyby it and then do a map function on it.  But for each map operator, I want to maintain a variable for it. Is that possible? I tried a naive version on local IntelliJ IDE and it works. But I got nullpointerException while trying to run it on a cluster.

Here is the pseudo code for my naive version, wish it will help you guys understand my question.

public class anomalydetection{
   private static List<TimeSeries> queue;
   public static void main(String[] args)throws Exception{
        initialize();
        getStreamExecutionEnvironment();
        DataStream input = ...
        stream.keyby("some key").map(
            MapFunction(){
                if(queue.size() < some_num){
                    queue.add()
                    //do something
                }
                else{
                    //dosomething
                }
            }
        )
    
    public void initialize(){
        queue = new ArrayList<>();
    }
}

when I try to get the size of the arraylist, I get a NullPointerException. Beside that, I want  to maintain a list for each map operator after I group the stream by some key.


Is that possible to do what I want in Flink? 

Desheng Zhang

E-mail: [hidden email];