Handling "Global" Updating State

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling "Global" Updating State

Rion Williams
Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:
  • When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety.
  • Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream.
  • Expose the thread-safe collection within the operators to actually perform the filtering.
This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against.

I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear.

Thanks much,

Rion

Reply | Threaded
Open this post in threaded view
|

Re: Handling "Global" Updating State

Rion Williams
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the bill for this scenario and keeping the downstream operators up-to-date as messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In my case, I need to have loaded all of my “lookup” topic into state before processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety into some collection like a hashmap (prior to executing the Flink pipeline to ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams <[hidden email]> wrote:


Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:
  • When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety.
  • Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream.
  • Expose the thread-safe collection within the operators to actually perform the filtering.
This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against.

I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear.

Thanks much,

Rion

Reply | Threaded
Open this post in threaded view
|

Re: Re: Handling "Global" Updating State

Yun Gao
Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some operator. The job
would keep the source alive after all the data are emit (like sleep forever), and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the broadcast state, it could
set the same uid and same state name with the corresponding operator in the first job, so it could
acqure the state content on startup.

Yun,
Best


------------------Original Mail ------------------
Sender:Rion Williams <[hidden email]>
Send Date:Mon May 17 07:00:03 2021
Recipients:user <[hidden email]>
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the bill for this scenario and keeping the downstream operators up-to-date as messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In my case, I need to have loaded all of my “lookup” topic into state before processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety into some collection like a hashmap (prior to executing the Flink pipeline to ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams <[hidden email]> wrote:


Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:
  • When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety.
  • Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream.
  • Expose the thread-safe collection within the operators to actually perform the filtering.
This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against.

I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear.

Thanks much,

Rion

Reply | Threaded
Open this post in threaded view
|

Re: Handling "Global" Updating State

Rion Williams
Hi Yun,

That’s very helpful and good to know that the problem/use-case has been thought about. Since my need is probably shorter-term than later, I’ll likely need to explore a workaround.

Do you know of an approach that might not require the use of check pointing and restarting? I was looking into exploring initializeState within my broadcast-side stream to get it current and then simply listening to the Kafka topic as records come in. I’d imagine this would work, but that may be a bit of a naive approach.

Thanks!

Rion 

On May 17, 2021, at 1:36 AM, Yun Gao <[hidden email]> wrote:


Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some operator. The job
would keep the source alive after all the data are emit (like sleep forever), and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the broadcast state, it could
set the same uid and same state name with the corresponding operator in the first job, so it could
acqure the state content on startup.

Yun,
Best


------------------Original Mail ------------------
Sender:Rion Williams <[hidden email]>
Send Date:Mon May 17 07:00:03 2021
Recipients:user <[hidden email]>
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the bill for this scenario and keeping the downstream operators up-to-date as messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In my case, I need to have loaded all of my “lookup” topic into state before processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety into some collection like a hashmap (prior to executing the Flink pipeline to ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams <[hidden email]> wrote:


Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:
  • When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety.
  • Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream.
  • Expose the thread-safe collection within the operators to actually perform the filtering.
This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against.

I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear.

Thanks much,

Rion

Reply | Threaded
Open this post in threaded view
|

Re: Re: Handling "Global" Updating State

Yun Gao
Hi Rion,

Sorry for the late reply, another simpler method might indeed be in initializeState,
the operator directly read the data from the kafka to initialize the state.

Best,
Yun


------------------Original Mail ------------------
Sender:Rion Williams <[hidden email]>
Send Date:Mon May 17 19:53:35 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Handling "Global" Updating State
Hi Yun,

That’s very helpful and good to know that the problem/use-case has been thought about. Since my need is probably shorter-term than later, I’ll likely need to explore a workaround.

Do you know of an approach that might not require the use of check pointing and restarting? I was looking into exploring initializeState within my broadcast-side stream to get it current and then simply listening to the Kafka topic as records come in. I’d imagine this would work, but that may be a bit of a naive approach.

Thanks!

Rion 

On May 17, 2021, at 1:36 AM, Yun Gao <[hidden email]> wrote:


Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some operator. The job
would keep the source alive after all the data are emit (like sleep forever), and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the broadcast state, it could
set the same uid and same state name with the corresponding operator in the first job, so it could
acqure the state content on startup.

Yun,
Best


------------------Original Mail ------------------
Sender:Rion Williams <[hidden email]>
Send Date:Mon May 17 07:00:03 2021
Recipients:user <[hidden email]>
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the bill for this scenario and keeping the downstream operators up-to-date as messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In my case, I need to have loaded all of my “lookup” topic into state before processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety into some collection like a hashmap (prior to executing the Flink pipeline to ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams <[hidden email]> wrote:


Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. The gist of it is that I have a job that listens to a series of events from a Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on some configurations that are currently being stored within another Kafka topic. I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:
  • When Flink starts up, use a regular Kafka Consumer and read all of the configuration data from that topic in its entirety.
  • Store the messages from that topic in some type of thread-safe collection statically accessible by the operators downstream.
  • Expose the thread-safe collection within the operators to actually perform the filtering.
This doesn't seem right though. I was reading about BroadcastState which seems like it might fit the bill (e.g. keep those mappings in Broadcast state so that all of the downstream operations would have access to them, which I'd imagine would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a series of mappings that I want to keep relatively up to date in a Kafka topic, and I'm consuming from another Kafka topic that will need those mappings to filter against.

I'd be happy to share some of the approaches I currently have or elaborate a bit more if that isn't super clear.

Thanks much,

Rion