Flink streaming questions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink streaming questions

Henri Heiskanen
Hi,

I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream.

stream = createKafkaStreams(...);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
stream
                .keyBy(2)
                .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
                .fold(new SessionData(), new SessionFold(), new ProfilerApply())
                .print();

The questions:

1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly.

2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold?

3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows.

4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something.

Br,
Henri H
Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming questions

Jamie Grier
Hi Henri,

#1 - This is by design.  Event time advances with the slowest input source.  If there are input sources that generate no data this is indistinguishable from a slow source.  Kafka topics where some partitions receive no data are a problem in this regard -- but there isn't a simple solution.  If possible I would address it at the source.

#2 - If it's possible to run these init functions just once when you submit the job you can run them in the constructor of your FoldFunction.  This init will then happen exactly once (on the client) and the constructed FoldFunction is then serialized and distributed around the cluster.  If this doesn't work because you need something truly dynamic you could also accomplish this with a simple local variable in your function.

class MyFoldFunction extends FoldFunction {
  private var initialized = false
  def fold(accumulator: T, value: O): T = {
    if(!initialized){
      doInitStuff()
      initialized = true
    }
    
    doNormalStuff()
  }
}

#3 - One way to do this is as you've said which is to attach the profile information to the event, using a mapper, before it enters the window operations.


On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <[hidden email]> wrote:
Hi,

I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream.

stream = createKafkaStreams(...);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
stream
                .keyBy(2)
                .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
                .fold(new SessionData(), new SessionFold(), new ProfilerApply())
                .print();

The questions:

1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly.

2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold?

3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows.

4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something.

Br,
Henri H



--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming questions

Henri Heiskanen
Hi,

Actually it seems "Fold cannot be used with a merging WindowAssigner" and workaround I found was to use generic window function. It seems that I would need to use the window apply anyway. Functionality is then all there, but I am really concerned on the resource utilisations. We have quite many concurrent users, they generate a lot of events and sessions may be long.

The workaround you gave for initialisation was exactly what I was doing already and yes it is so dynamic that you can not use constructor. However, I would need to also close the resources I open gracefully and as initialisation is quite heavy it was weird to put that in fold function to be done on first event processed.

Br,
Henri H

On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <[hidden email]> wrote:
Hi Henri,

#1 - This is by design.  Event time advances with the slowest input source.  If there are input sources that generate no data this is indistinguishable from a slow source.  Kafka topics where some partitions receive no data are a problem in this regard -- but there isn't a simple solution.  If possible I would address it at the source.

#2 - If it's possible to run these init functions just once when you submit the job you can run them in the constructor of your FoldFunction.  This init will then happen exactly once (on the client) and the constructed FoldFunction is then serialized and distributed around the cluster.  If this doesn't work because you need something truly dynamic you could also accomplish this with a simple local variable in your function.

class MyFoldFunction extends FoldFunction {
  private var initialized = false
  def fold(accumulator: T, value: O): T = {
    if(!initialized){
      doInitStuff()
      initialized = true
    }
    
    doNormalStuff()
  }
}

#3 - One way to do this is as you've said which is to attach the profile information to the event, using a mapper, before it enters the window operations.


On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <[hidden email]> wrote:
Hi,

I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream.

stream = createKafkaStreams(...);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
stream
                .keyBy(2)
                .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
                .fold(new SessionData(), new SessionFold(), new ProfilerApply())
                .print();

The questions:

1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly.

2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold?

3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows.

4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something.

Br,
Henri H



--

Jamie Grier
data Artisans, Director of Applications Engineering


Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming questions

Fabian Hueske-2
Hi Henri,

can you express the logic of your FoldFunction (or WindowFunction) as a combination of ReduceFunction and WindowFunction [1]?
ReduceFunction should be supported by a merging WindowAssigner and has the same resource consumption as a FoldFunction, i.e., a single record per window.

Best, Fabian

2017-01-03 12:32 GMT+01:00 Henri Heiskanen <[hidden email]>:
Hi,

Actually it seems "Fold cannot be used with a merging WindowAssigner" and workaround I found was to use generic window function. It seems that I would need to use the window apply anyway. Functionality is then all there, but I am really concerned on the resource utilisations. We have quite many concurrent users, they generate a lot of events and sessions may be long.

The workaround you gave for initialisation was exactly what I was doing already and yes it is so dynamic that you can not use constructor. However, I would need to also close the resources I open gracefully and as initialisation is quite heavy it was weird to put that in fold function to be done on first event processed.

Br,
Henri H

On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <[hidden email]> wrote:
Hi Henri,

#1 - This is by design.  Event time advances with the slowest input source.  If there are input sources that generate no data this is indistinguishable from a slow source.  Kafka topics where some partitions receive no data are a problem in this regard -- but there isn't a simple solution.  If possible I would address it at the source.

#2 - If it's possible to run these init functions just once when you submit the job you can run them in the constructor of your FoldFunction.  This init will then happen exactly once (on the client) and the constructed FoldFunction is then serialized and distributed around the cluster.  If this doesn't work because you need something truly dynamic you could also accomplish this with a simple local variable in your function.

class MyFoldFunction extends FoldFunction {
  private var initialized = false
  def fold(accumulator: T, value: O): T = {
    if(!initialized){
      doInitStuff()
      initialized = true
    }
    
    doNormalStuff()
  }
}

#3 - One way to do this is as you've said which is to attach the profile information to the event, using a mapper, before it enters the window operations.


On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <[hidden email]> wrote:
Hi,

I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream.

stream = createKafkaStreams(...);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
stream
                .keyBy(2)
                .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
                .fold(new SessionData(), new SessionFold(), new ProfilerApply())
                .print();

The questions:

1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly.

2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold?

3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows.

4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something.

Br,
Henri H



--

Jamie Grier
data Artisans, Director of Applications Engineering



Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming questions

Henri Heiskanen
Hi,

Unfortunately I can not use reduce function.

I am now going with WindowFunction and see how it works on our production load.

Br,
Henkka

On Wed, Jan 4, 2017 at 2:46 PM, Fabian Hueske <[hidden email]> wrote:
Hi Henri,

can you express the logic of your FoldFunction (or WindowFunction) as a combination of ReduceFunction and WindowFunction [1]?
ReduceFunction should be supported by a merging WindowAssigner and has the same resource consumption as a FoldFunction, i.e., a single record per window.

Best, Fabian

2017-01-03 12:32 GMT+01:00 Henri Heiskanen <[hidden email]>:
Hi,

Actually it seems "Fold cannot be used with a merging WindowAssigner" and workaround I found was to use generic window function. It seems that I would need to use the window apply anyway. Functionality is then all there, but I am really concerned on the resource utilisations. We have quite many concurrent users, they generate a lot of events and sessions may be long.

The workaround you gave for initialisation was exactly what I was doing already and yes it is so dynamic that you can not use constructor. However, I would need to also close the resources I open gracefully and as initialisation is quite heavy it was weird to put that in fold function to be done on first event processed.

Br,
Henri H

On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <[hidden email]> wrote:
Hi Henri,

#1 - This is by design.  Event time advances with the slowest input source.  If there are input sources that generate no data this is indistinguishable from a slow source.  Kafka topics where some partitions receive no data are a problem in this regard -- but there isn't a simple solution.  If possible I would address it at the source.

#2 - If it's possible to run these init functions just once when you submit the job you can run them in the constructor of your FoldFunction.  This init will then happen exactly once (on the client) and the constructed FoldFunction is then serialized and distributed around the cluster.  If this doesn't work because you need something truly dynamic you could also accomplish this with a simple local variable in your function.

class MyFoldFunction extends FoldFunction {
  private var initialized = false
  def fold(accumulator: T, value: O): T = {
    if(!initialized){
      doInitStuff()
      initialized = true
    }
    
    doNormalStuff()
  }
}

#3 - One way to do this is as you've said which is to attach the profile information to the event, using a mapper, before it enters the window operations.


On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <[hidden email]> wrote:
Hi,

I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream.

stream = createKafkaStreams(...);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
stream
                .keyBy(2)
                .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
                .fold(new SessionData(), new SessionFold(), new ProfilerApply())
                .print();

The questions:

1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly.

2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold?

3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows.

4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something.

Br,
Henri H



--

Jamie Grier
data Artisans, Director of Applications Engineering