Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

sagar
Hi Team,

I am getting the following error while running DataStream API in with batch mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.
Reply | Threaded
Open this post in threaded view
|

Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

Ardhani Narasimha Swamy
Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar <[hidden email]> wrote:
Hi Team,

I am getting the following error while running DataStream API in with batch mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Reply | Threaded
Open this post in threaded view
|

Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

Yun Gao
Hi Sagar,

      I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



------------------Original Mail ------------------
Sender:Ardhani Narasimha <[hidden email]>
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar <[hidden email]>
CC:Flink User Mail List <[hidden email]>
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar <[hidden email]> wrote:
Hi Team,

I am getting the following error while running DataStream API in with batch mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Reply | Threaded
Open this post in threaded view
|

Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

sagar
In reply to this post by Ardhani Narasimha Swamy
Hi Ardhani,

So whenever I want to run this flink job, I will call the Java API to put the data to the four different kafka topics, what data to put into kafka will be coded into those API and then once that is complete, I want to run the flink job on the available data in the kafka and perform business operation on all the available data.

I am not sure whether kafka as a datasource will be best for this use case, but somehow I don't want to expose my flink job to database tables directly. 
 


Thanks & Regards,
Sagar 


On Thu, Jan 14, 2021 at 12:41 PM Ardhani Narasimha <[hidden email]> wrote:
Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar <[hidden email]> wrote:
Hi Team,

I am getting the following error while running DataStream API in with batch mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------


--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

Yun Gao
In reply to this post by Yun Gao
Hi Sagar,

  I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient. 

Best,
 Yun

------------------------------------------------------------------
Sender:Yun Gao<[hidden email]>
Date:2021/01/14 15:26:54
Recipient:Ardhani Narasimha<[hidden email]>; sagar<[hidden email]>
Cc:Flink User Mail List<[hidden email]>
Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

Hi Sagar,

      I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



------------------Original Mail ------------------
Sender:Ardhani Narasimha <[hidden email]>
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar <[hidden email]>
CC:Flink User Mail List <[hidden email]>
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar <[hidden email]> wrote:
Hi Team,

I am getting the following error while running DataStream API in with batch mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

sagar
Thanks Yun



On Thu, Jan 14, 2021 at 1:58 PM Yun Gao <[hidden email]> wrote:
Hi Sagar,

  I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient. 

Best,
 Yun

------------------------------------------------------------------
Sender:Yun Gao<[hidden email]>
Date:2021/01/14 15:26:54
Recipient:Ardhani Narasimha<[hidden email]>; sagar<[hidden email]>
Cc:Flink User Mail List<[hidden email]>
Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

Hi Sagar,

      I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode.

Best,
 Yun







------------------Original Mail ------------------
Sender:Ardhani Narasimha <[hidden email]>
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar <[hidden email]>
CC:Flink User Mail List <[hidden email]>
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar <[hidden email]> wrote:
Hi Team,

I am getting the following error while running DataStream API in with batch mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------



--
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

Abhishek Rai
I had a similar need recently and ended up using
KafkaDeserializationSchemaWrapper to wrap a given
DeserializationSchema.  The resulting
KafkaDeserializationSchema[Wrapper] can be passed directly to the
`FlinkKafkaConsumer` constructor.

```
class BoundingDeserializationSchema
    extends KafkaDeserializationSchemaWrapper<Row> {
 private static final long serialVersionUID = 1858204203663583785L;
 private long maxRecords_;
 private long numRecords_ = 0;

 public BoundingDeserializationSchema(
     DeserializationSchema<Row> deserializationSchema,
     long maxRecords) {
  super(deserializationSchema);
  maxRecords_ = maxRecords;
 }

 @Override
 public void deserialize(
     ConsumerRecord<byte[], byte[]> message, Collector<Row> out)
     throws Exception {
  super.deserialize(message, out);
  numRecords_++;
 }

 @Override
 public boolean isEndOfStream(Row nextElement) {
  return numRecords_ >= maxRecords_;
 }
}

```

On Thu, Jan 14, 2021 at 6:15 AM sagar <[hidden email]> wrote:

>
> Thanks Yun
>
>
>
> On Thu, Jan 14, 2021 at 1:58 PM Yun Gao <[hidden email]> wrote:
>>
>> Hi Sagar,
>>
>>   I rechecked and found that the new kafka source is not formally publish yet, and a stable method I think may be try adding the FlinkKafkaConsumer as a BOUNDED source first. Sorry for the inconvient.
>>
>> Best,
>>  Yun
>>
>> ------------------------------------------------------------------
>> Sender:Yun Gao<[hidden email]>
>> Date:2021/01/14 15:26:54
>> Recipient:Ardhani Narasimha<[hidden email]>; sagar<[hidden email]>
>> Cc:Flink User Mail List<[hidden email]>
>> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
>>
>> Hi Sagar,
>>
>>       I think the problem is that the legacy source implemented by extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). Although there is hacky way to add the legacy sources as BOUNDED source [1], I think you may first have a try of new version of KafkaSource [2] ? The new version of KafkaSource is implemented with the new Source API [3], which provides unfied support for the streaming and batch mode.
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> [1] https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
>> [2]  https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
>> [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>
>>
>>
>> ------------------Original Mail ------------------
>> Sender:Ardhani Narasimha <[hidden email]>
>> Send Date:Thu Jan 14 15:11:35 2021
>> Recipients:sagar <[hidden email]>
>> CC:Flink User Mail List <[hidden email]>
>> Subject:Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)
>>>
>>> Interesting use case.
>>>
>>> Can you please elaborate more on this.
>>> On what criteria do you want to batch? Time? Count? Or Size?
>>>
>>> On Thu, 14 Jan 2021 at 12:15 PM, sagar <[hidden email]> wrote:
>>>>
>>>> Hi Team,
>>>>
>>>> I am getting the following error while running DataStream API in with batch mode with kafka source.
>>>> I am using FlinkKafkaConsumer to consume the data.
>>>>
>>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]
>>>>
>>>> In my batch program I wanted to work with four to five different stream in batch mode as data source is bounded
>>>>
>>>> I don't find any clear example of how to do it with kafka souce with Flink 1.12
>>>>
>>>> I don't want to use JDBC source as underlying database table may change. please give me some example on how to achieve the above use case.
>>>>
>>>> Also for any large bounded source are there any alternatives to achieve this?
>>>>
>>>>
>>>>
>>>> --
>>>> ---Regards---
>>>>
>>>>   Sagar Bandal
>>>>
>>>> This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.
>>>
>>>
>>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>> IMPORTANT: The contents of this email and any attachments are confidential and protected by applicable laws. If you have received this email by mistake, please (i) notify the sender immediately; (ii) delete it from your database; and (iii) do not disclose the contents to anyone or make copies thereof. Razorpay accepts no liability caused due to any inadvertent/ unintentional data transmitted through this email.
>>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.