Question about My Flink Application

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about My Flink Application

Sara Arshad
Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300 seconds.
To share the cache data with the kinesis stream elements, I used a broadcast stream as I implemented a SourceFunction which gets the data from DB and broadcast it to the next stream which is KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I execute it in kinesis analytics, it keeps restarting about every 20 minutes. 
Could you please help me that what could be the issue? 

Best regards,
Sara Arshad
Reply | Threaded
Open this post in threaded view
|

Re: Question about My Flink Application

Alexander Fedulov
Hi Sara, 

do you have logs? Any exceptions in them? 

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Tue, May 19, 2020 at 11:28 PM Sara Arshad <[hidden email]> wrote:
Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300 seconds.
To share the cache data with the kinesis stream elements, I used a broadcast stream as I implemented a SourceFunction which gets the data from DB and broadcast it to the next stream which is KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I execute it in kinesis analytics, it keeps restarting about every 20 minutes. 
Could you please help me that what could be the issue? 

Best regards,
Sara Arshad
Reply | Threaded
Open this post in threaded view
|

Re: Question about My Flink Application

Alexander Fedulov
Returning the discussion to the mailing list ( it accidentally went to a side channel because of a direct reply).
What I was referring to, is the event-time processing semantic, which is based on the watermarks mechanism [1].
If you are using it, the event time at your KeyedBroadcastProcessFuction will be determined as a minimum value of the maximum watermarks observed across all of the input channels. In order not to stall the processing of the events of the main data flow by the control channel (broadcast stream), you could set it's watermark to the maximum possible value, as shown in this example [2]

[2] https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference



On Sat, May 23, 2020 at 1:05 AM Sara Arshad <[hidden email]> wrote:
- It was based on something I read about the broadcast. 
Besides, as I mentioned before, the restart happens when it's triggering checkpoints.
- When I send the streams it processes it perfectly fine between restarts.
- Yes, I am using ProcessingTimeService in the cache source to make it get data every 300 seconds. 
Do you have any views on should it be doable with a stream of a million messages, In case I improve my implementation?

Best regards,
Sara

On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov <[hidden email]> wrote:
OK, with such data sizes this should definitely be doable with a broadcast channel.
"The problem was that the broadcast puts a lot of pressure on checkpointing." - is this the evaluation of the AWS support? Do you have any details as to why this is considered to be the case?
"Even before I start to send the Kinesis stream it stuck." - so do you actually see any data output or nothing is happening and 20 minutes later the job crashes?
Are you using event time processing semantics in your pipeline?

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Fri, May 22, 2020 at 4:34 PM Sara Arshad <[hidden email]> wrote:
Hi Alexander,

It's not that much data. I have only 2 records in my dynamodb right now (later it can be around 100 records. it's not that much) and I update the whole data every 300 seconds. 
Even before I start to send the Kinesis stream it stuck.
Yes, I can see the checkpoint size is around 150k. But in some cases when I sent Kinesis Stream of 80 messages it's around 190k.
The maximum checkpoint duration is 670.

Regards,


On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, <[hidden email]> wrote:
Hi Sara,

what is the volume of data that is coming in through the broadcast channel every 30 seconds? Do you only insert modified rules entries or all of them on each update?
Do you have access to metrics? Specifically, the size of the checkpoints and time distribution of different checkpoint phases are of interest.

Best,

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Fri, May 22, 2020 at 3:57 PM Sara Arshad <[hidden email]> wrote:
The problem was that the broadcast puts a lot of pressure on checkpointing.
I have to find another solution.
If you have any other solution please let me know.

Regards,
Sara

On Wed, 20 May 2020, 5:55 pm Sara Arshad, <[hidden email]> wrote:
That was the broadcast stream. Which is supposed to behave like a cache. 
Then I connect that one to the kinesis stream like the below code.
Also, I have two Sink functions to send the results to another dynamoDb table or cloud watch based on the output type.
Is this make sense or do you have another idea?

DataStreamSource<MyRule> ruleDataStreamSource = env.addSource(new DynamoDBSource()).setParallelism(1);

MapStateDescriptor<String, MyRule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<MyRule>() {
}));

BroadcastStream<MyRule> ruleBroadcastStream = ruleDataStreamSource
.broadcast(ruleStateDescriptor);

SingleOutputStreamOperator<WindowFunctionOutput> process = env.addSource(ObjectFactory.getKinesisConsumer())
.keyBy(new KeySelector<Message, Tuple2<Identifier, Integer>>() {

@Override
public Tuple2<Identifier, Integer> getKey(Message value) throws Exception {
return Tuple2.of(value.getIdentifier(), value.getServiceType());
}

})
.connect(ruleBroadcastStream)
.process(new BroadcastWindowFunction());

DataStreamSink<OutputX> blockingStrategyOutputDataStreamSink = process
.filter(output -> OutputX.class.isAssignableFrom(output.getClass()))
.map(output -> (OutputX) output)
.addSink(new DynamoDBSink());


DataStreamSink<OutputY> metricsOutputDataStreamSink = process
.filter(output -> OutputY.class.isAssignableFrom(output.getClass()))
.map(output -> (OutputY) output)
.addSink(new CloudWatchMetricsSink());

On Wed, May 20, 2020 at 5:45 PM Alexander Fedulov <[hidden email]> wrote:
I did not notice that you are actually running the KinesisAnalytics job, without access to the machines, sorry. In this case, without any errors in the logs, I think there is not much that we can do without the AWS support team looking into it. 
Nonetheless, one thing I was wondering about is whether you necessarily need to have a custom DynamoDBSource to fetch rules periodically. How about directly connecting to the steam of DynamoDB updates and getting everything in the real time [1] ? This would remove one moving part that, as I see, you suspect to be a potential source of errors.

[1] https://issues.apache.org/jira/browse/FLINK-4582

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Wed, May 20, 2020 at 4:38 PM Sara Arshad <[hidden email]> wrote:
I am using KinesisAnalytics and cloud watch dashboard for logs. 
There is no error level log.
It's doing stuff like 'Triggering checkpoint 187 @ 1589984558975 for job 7998aca8913b33a090bc5c0f43168bd5.' then suddenly it is restarting.
I know these are very general but I really don't know what's going on. 
I also asked AWS support. They haven't replied yet.

This is my broadcast stream source:

public class DynamoDBSource extends RichParallelSourceFunction<MyRule> implements CheckpointedFunction, ProcessingTimeCallback {

    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    ListStateDescriptor<MyRule> ruleStateDescriptor = new ListStateDescriptor<>(
            "RulesBroadcastState",
            TypeInformation.of(new TypeHint<MyRule>() {
            }));
   
    private volatile Boolean sendData = true;
    private transient ListState<MyRule> listState;

    private transient ProcessingTimeServiceInf processingTimeService;

    private static long rulesUpdateIntervalMillis;

    @Override
    public void open(Configuration parameters) throws Exception {

        //...

        processingTimeService = new ProcessingTimeServiceImpl();

        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();

        rulesUpdateIntervalMillis = Some static value from config class

        processingTimeService.registerTimer(currentProcessingTime, this);

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {

        Preconditions.checkState(this.listState != null,
                "The " + getClass().getSimpleName() + " has not been properly initialized.");

        this.listState.clear();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

        Preconditions.checkState(this.listState == null,
                "The " + getClass().getSimpleName() + " has already been initialized.");

        this.listState = context.getOperatorStateStore().getListState(ruleStateDescriptor);

    }

    @Override
    public void run(SourceContext<BlockingRule> ctx) throws Exception {

        while (isRunning) {

            synchronized (sendData) {

                if (sendData) {

                    for (MyRule rule : listState.get()) {

                        ctx.collect(rule);
                    }
                    sendData = false;
                }
            }
        }

    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {

        readRulesFromDB();
      
        processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + rulesUpdateIntervalMillis, this);
    }

    private synchronized void readRulesFromDB() {


         ...
        this.sendData = true;
    }
}


On Wed, May 20, 2020 at 4:10 PM Alexander Fedulov <[hidden email]> wrote:
We'd need more details to localize the problem. What are the last things printed before the restart? Are there any actual error-level logs there? Do you happen to find any JVM crash files (hs_err_pidXXXX.log) on your Flink machines?


--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Wed, May 20, 2020 at 4:01 PM Sara Arshad <[hidden email]> wrote:
Thank you for your response. 
I get 
Error when creating PropertyDescriptor for public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty...
quite a lot. But it's Info and also I had it before broadcast.
I also retried an older version of my application and it still works fine.
By the way, the scenario works fine between restarts. 

Best regards,

On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <[hidden email]> wrote:
Hi Sara, 

do you have logs? Any exceptions in them? 

Best,

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Tue, May 19, 2020 at 11:28 PM Sara Arshad <[hidden email]> wrote:
Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300 seconds.
To share the cache data with the kinesis stream elements, I used a broadcast stream as I implemented a SourceFunction which gets the data from DB and broadcast it to the next stream which is KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I execute it in kinesis analytics, it keeps restarting about every 20 minutes. 
Could you please help me that what could be the issue? 

Best regards,
Sara Arshad


On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <[hidden email]> wrote:
Hi Sara, 

do you have logs? Any exceptions in them? 

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Tue, May 19, 2020 at 11:28 PM Sara Arshad <[hidden email]> wrote:
Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300 seconds.
To share the cache data with the kinesis stream elements, I used a broadcast stream as I implemented a SourceFunction which gets the data from DB and broadcast it to the next stream which is KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I execute it in kinesis analytics, it keeps restarting about every 20 minutes. 
Could you please help me that what could be the issue? 

Best regards,
Sara Arshad
Reply | Threaded
Open this post in threaded view
|

Re: Question about My Flink Application

Sara Arshad
Hi Alexander,

Thank you for your reply. 
I got a reply from AWS people. Seems like it's a configuration problem.
But, even if it works fine without restarting, it's not a good option for us.
There is no one-to-one relation between cache data and keyed values. Therefore, It has to send the whole data to every key every 5 minutes and we may have a very large number of keys at the same time.
So I came up with a completely different solution. Now, I only have the cache in a shared MAP. Maybe, It is not that much good design-wise but it has higher performance.

Best regards,
Sara



On Sat, May 23, 2020 at 1:04 PM Alexander Fedulov <[hidden email]> wrote:
Returning the discussion to the mailing list ( it accidentally went to a side channel because of a direct reply).
What I was referring to, is the event-time processing semantic, which is based on the watermarks mechanism [1].
If you are using it, the event time at your KeyedBroadcastProcessFuction will be determined as a minimum value of the maximum watermarks observed across all of the input channels. In order not to stall the processing of the events of the main data flow by the control channel (broadcast stream), you could set it's watermark to the maximum possible value, as shown in this example [2]

[2] https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/dynamicrules/sources/RulesSource.java#L80

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference



On Sat, May 23, 2020 at 1:05 AM Sara Arshad <[hidden email]> wrote:
- It was based on something I read about the broadcast. 
Besides, as I mentioned before, the restart happens when it's triggering checkpoints.
- When I send the streams it processes it perfectly fine between restarts.
- Yes, I am using ProcessingTimeService in the cache source to make it get data every 300 seconds. 
Do you have any views on should it be doable with a stream of a million messages, In case I improve my implementation?

Best regards,
Sara

On Fri, May 22, 2020 at 6:22 PM Alexander Fedulov <[hidden email]> wrote:
OK, with such data sizes this should definitely be doable with a broadcast channel.
"The problem was that the broadcast puts a lot of pressure on checkpointing." - is this the evaluation of the AWS support? Do you have any details as to why this is considered to be the case?
"Even before I start to send the Kinesis stream it stuck." - so do you actually see any data output or nothing is happening and 20 minutes later the job crashes?
Are you using event time processing semantics in your pipeline?

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Fri, May 22, 2020 at 4:34 PM Sara Arshad <[hidden email]> wrote:
Hi Alexander,

It's not that much data. I have only 2 records in my dynamodb right now (later it can be around 100 records. it's not that much) and I update the whole data every 300 seconds. 
Even before I start to send the Kinesis stream it stuck.
Yes, I can see the checkpoint size is around 150k. But in some cases when I sent Kinesis Stream of 80 messages it's around 190k.
The maximum checkpoint duration is 670.

Regards,


On Fri, 22 May 2020, 4:15 pm Alexander Fedulov, <[hidden email]> wrote:
Hi Sara,

what is the volume of data that is coming in through the broadcast channel every 30 seconds? Do you only insert modified rules entries or all of them on each update?
Do you have access to metrics? Specifically, the size of the checkpoints and time distribution of different checkpoint phases are of interest.

Best,

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Fri, May 22, 2020 at 3:57 PM Sara Arshad <[hidden email]> wrote:
The problem was that the broadcast puts a lot of pressure on checkpointing.
I have to find another solution.
If you have any other solution please let me know.

Regards,
Sara

On Wed, 20 May 2020, 5:55 pm Sara Arshad, <[hidden email]> wrote:
That was the broadcast stream. Which is supposed to behave like a cache. 
Then I connect that one to the kinesis stream like the below code.
Also, I have two Sink functions to send the results to another dynamoDb table or cloud watch based on the output type.
Is this make sense or do you have another idea?

DataStreamSource<MyRule> ruleDataStreamSource = env.addSource(new DynamoDBSource()).setParallelism(1);

MapStateDescriptor<String, MyRule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<MyRule>() {
}));

BroadcastStream<MyRule> ruleBroadcastStream = ruleDataStreamSource
.broadcast(ruleStateDescriptor);

SingleOutputStreamOperator<WindowFunctionOutput> process = env.addSource(ObjectFactory.getKinesisConsumer())
.keyBy(new KeySelector<Message, Tuple2<Identifier, Integer>>() {

@Override
public Tuple2<Identifier, Integer> getKey(Message value) throws Exception {
return Tuple2.of(value.getIdentifier(), value.getServiceType());
}

})
.connect(ruleBroadcastStream)
.process(new BroadcastWindowFunction());

DataStreamSink<OutputX> blockingStrategyOutputDataStreamSink = process
.filter(output -> OutputX.class.isAssignableFrom(output.getClass()))
.map(output -> (OutputX) output)
.addSink(new DynamoDBSink());


DataStreamSink<OutputY> metricsOutputDataStreamSink = process
.filter(output -> OutputY.class.isAssignableFrom(output.getClass()))
.map(output -> (OutputY) output)
.addSink(new CloudWatchMetricsSink());

On Wed, May 20, 2020 at 5:45 PM Alexander Fedulov <[hidden email]> wrote:
I did not notice that you are actually running the KinesisAnalytics job, without access to the machines, sorry. In this case, without any errors in the logs, I think there is not much that we can do without the AWS support team looking into it. 
Nonetheless, one thing I was wondering about is whether you necessarily need to have a custom DynamoDBSource to fetch rules periodically. How about directly connecting to the steam of DynamoDB updates and getting everything in the real time [1] ? This would remove one moving part that, as I see, you suspect to be a potential source of errors.

[1] https://issues.apache.org/jira/browse/FLINK-4582

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Wed, May 20, 2020 at 4:38 PM Sara Arshad <[hidden email]> wrote:
I am using KinesisAnalytics and cloud watch dashboard for logs. 
There is no error level log.
It's doing stuff like 'Triggering checkpoint 187 @ 1589984558975 for job 7998aca8913b33a090bc5c0f43168bd5.' then suddenly it is restarting.
I know these are very general but I really don't know what's going on. 
I also asked AWS support. They haven't replied yet.

This is my broadcast stream source:

public class DynamoDBSource extends RichParallelSourceFunction<MyRule> implements CheckpointedFunction, ProcessingTimeCallback {

    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    ListStateDescriptor<MyRule> ruleStateDescriptor = new ListStateDescriptor<>(
            "RulesBroadcastState",
            TypeInformation.of(new TypeHint<MyRule>() {
            }));
   
    private volatile Boolean sendData = true;
    private transient ListState<MyRule> listState;

    private transient ProcessingTimeServiceInf processingTimeService;

    private static long rulesUpdateIntervalMillis;

    @Override
    public void open(Configuration parameters) throws Exception {

        //...

        processingTimeService = new ProcessingTimeServiceImpl();

        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();

        rulesUpdateIntervalMillis = Some static value from config class

        processingTimeService.registerTimer(currentProcessingTime, this);

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {

        Preconditions.checkState(this.listState != null,
                "The " + getClass().getSimpleName() + " has not been properly initialized.");

        this.listState.clear();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

        Preconditions.checkState(this.listState == null,
                "The " + getClass().getSimpleName() + " has already been initialized.");

        this.listState = context.getOperatorStateStore().getListState(ruleStateDescriptor);

    }

    @Override
    public void run(SourceContext<BlockingRule> ctx) throws Exception {

        while (isRunning) {

            synchronized (sendData) {

                if (sendData) {

                    for (MyRule rule : listState.get()) {

                        ctx.collect(rule);
                    }
                    sendData = false;
                }
            }
        }

    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {

        readRulesFromDB();
      
        processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + rulesUpdateIntervalMillis, this);
    }

    private synchronized void readRulesFromDB() {


         ...
        this.sendData = true;
    }
}


On Wed, May 20, 2020 at 4:10 PM Alexander Fedulov <[hidden email]> wrote:
We'd need more details to localize the problem. What are the last things printed before the restart? Are there any actual error-level logs there? Do you happen to find any JVM crash files (hs_err_pidXXXX.log) on your Flink machines?


--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Wed, May 20, 2020 at 4:01 PM Sara Arshad <[hidden email]> wrote:
Thank you for your response. 
I get 
Error when creating PropertyDescriptor for public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty...
quite a lot. But it's Info and also I had it before broadcast.
I also retried an older version of my application and it still works fine.
By the way, the scenario works fine between restarts. 

Best regards,

On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <[hidden email]> wrote:
Hi Sara, 

do you have logs? Any exceptions in them? 

Best,

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference


On Tue, May 19, 2020 at 11:28 PM Sara Arshad <[hidden email]> wrote:
Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300 seconds.
To share the cache data with the kinesis stream elements, I used a broadcast stream as I implemented a SourceFunction which gets the data from DB and broadcast it to the next stream which is KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I execute it in kinesis analytics, it keeps restarting about every 20 minutes. 
Could you please help me that what could be the issue? 

Best regards,
Sara Arshad


On Wed, May 20, 2020 at 3:44 PM Alexander Fedulov <[hidden email]> wrote:
Hi Sara, 

do you have logs? Any exceptions in them? 

Best,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Tue, May 19, 2020 at 11:28 PM Sara Arshad <[hidden email]> wrote:
Hi,

I have been using Flink with kinesis analytics.
I have a stream of data and also I need a cache which I update every 300 seconds.
To share the cache data with the kinesis stream elements, I used a broadcast stream as I implemented a SourceFunction which gets the data from DB and broadcast it to the next stream which is KeyedBroadcastProcessFuction.
But after adding the broadcast stream (in the previous version I hadn't
a cache and I was using KeyedProcessFuction for kinesis stream), when I execute it in kinesis analytics, it keeps restarting about every 20 minutes. 
Could you please help me that what could be the issue? 

Best regards,
Sara Arshad