FlinkKafkaProducer011 fails when kafka broker crash

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducer011 fails when kafka broker crash

陈卓

Hi

My flink version :1.6.0, flinkKafkaConnector version FlinkKafkaProducer011,job fail when kafka broker crash

 

the exception info:

 

 

 

--

Thanks
zhuo chen

 

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducer011 fails when kafka broker crash

hzyuemeng1
it's normal situation for flink,but maybe kafka problem
you can add some config to kafka properties

Properties properties = new Properties();
properties.setProperty("request.timeout.ms", "120000");
properties.setProperty("timeout.ms", "120000");
properties.setProperty("retries", "10");
properties.setProperty("max.request.size", "5773890");
properties.setProperty("refresh.leader.backoff.ms", "10000");
properties.setProperty("batch.size", "163480");





On 12/4/2018 11:55[hidden email] wrote:

Hi

My flink version :1.6.0, flinkKafkaConnector version FlinkKafkaProducer011,job fail when kafka broker crash

 

the exception info:

 

 

 

--

Thanks
zhuo chen

 

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducer011 fails when kafka broker crash

Dawid Wysakowicz-2

Hi,

Yes, this is an expected behavior, flink kafka producer has to maintain a consistent view of all available partitions(with its leaders) to align with checkpointing mechanism. In such situation the expected behavior is to fail the job, restart from checkpoint and continue processing with newly discovered leaders.

Best,

Dawid

On 04/12/2018 07:06, hzyuemeng1 wrote:
it's normal situation for flink,but maybe kafka problem
you can add some config to kafka properties

Properties properties = new Properties();
properties.setProperty("request.timeout.ms", "120000");
properties.setProperty("timeout.ms", "120000");
properties.setProperty("retries", "10");
properties.setProperty("max.request.size", "5773890");
properties.setProperty("refresh.leader.backoff.ms", "10000");
properties.setProperty("batch.size", "163480");





On 12/4/2018 11:55[hidden email] wrote:

Hi

My flink version :1.6.0, flinkKafkaConnector version FlinkKafkaProducer011,job fail when kafka broker crash

 

the exception info:

 

 

 

--

Thanks
zhuo chen

 


signature.asc (849 bytes) Download Attachment