error closing kafka

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

error closing kafka

yuvraj singh

Hi all , 


I am getting this error with flink 1.6.0 , please help me .






2018-09-23 07:15:08,846 ERROR org.apache.kafka.clients.producer.KafkaProducer               - Interrupted while joining ioThread

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.

2018-09-23 07:15:08,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.

org.apache.kafka.common.KafkaException: Failed to close kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        ... 9 more


Thanks 

Yubraj Singh 

Reply | Threaded
Open this post in threaded view
|

Re: error closing kafka

miki haiat
What are you trying to do , can you share some code ?
This is the reason for the exeption 
Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.



On Mon, 24 Sep 2018, 9:23 yuvraj singh, <[hidden email]> wrote:

Hi all , 


I am getting this error with flink 1.6.0 , please help me .






2018-09-23 07:15:08,846 ERROR org.apache.kafka.clients.producer.KafkaProducer               - Interrupted while joining ioThread

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.

2018-09-23 07:15:08,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.

org.apache.kafka.common.KafkaException: Failed to close kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        ... 9 more


Thanks 

Yubraj Singh 

Reply | Threaded
Open this post in threaded view
|

Re: error closing kafka

yuvraj singh
I am processing data and then sending it to kafka by kafka sink . 

this is method where I am producing the data 
nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
.name("nudge-details-producer")
.uid("nudge-details-producer");



its my producer 
public class NudgeCarLevelProducer {

static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
public static FlinkKafkaProducer010<NudgeDetails> getProducer(PeakLocationFinderGlobalConfig config) {
return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
new NudgeCarLevelSchema(config),
FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
}
}


class NudgeCarLevelSchema implements SerializationSchema<NudgeDetails>
{
Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
ObjectMapper mapper = new ObjectMapper();
PeakLocationFinderGlobalConfig config;

public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
{
this.config = config;
}

@Override
public byte[] serialize(NudgeDetails element) {
byte [] bytes = null;
Document document = new Document();
document.setId(UUID.randomUUID().toString());
Metadata metadata = new Metadata();
metadata.setSchema(config.getFabricCarLevelDataStream());
metadata.setSchemaVersion(1);
metadata.setTenant(config.getTenantId());
metadata.setTimestamp(System.currentTimeMillis());
metadata.setType(Type.EVENT);
metadata.setSender("nudge");
metadata.setStream(config.getFabricCarLevelDataStream());
document.setMetadata(metadata);
document.setData(element);
try {
bytes = mapper.writeValueAsString(document).getBytes();
} catch (Exception e) {
logger.error("error while serializing nudge car level Schema");
}
return bytes;
}
}



On Mon, Sep 24, 2018 at 12:24 PM miki haiat <[hidden email]> wrote:
What are you trying to do , can you share some code ?
This is the reason for the exeption 
Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.



On Mon, 24 Sep 2018, 9:23 yuvraj singh, <[hidden email]> wrote:

Hi all , 


I am getting this error with flink 1.6.0 , please help me .






2018-09-23 07:15:08,846 ERROR org.apache.kafka.clients.producer.KafkaProducer               - Interrupted while joining ioThread

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.

2018-09-23 07:15:08,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.

org.apache.kafka.common.KafkaException: Failed to close kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        ... 9 more


Thanks 

Yubraj Singh 

Reply | Threaded
Open this post in threaded view
|

Re: error closing kafka

yuvraj singh
i have one more question , 
is it possible , if i do keyby on the stream it will get portioned automatically , 

because i am getting all the data in the same partition in kafka. 

Thanks 
Yubraj Singh   

On Mon, Sep 24, 2018 at 12:34 PM yuvraj singh <[hidden email]> wrote:
I am processing data and then sending it to kafka by kafka sink . 

this is method where I am producing the data 
nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
.name("nudge-details-producer")
.uid("nudge-details-producer");



its my producer 
public class NudgeCarLevelProducer {

static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
public static FlinkKafkaProducer010<NudgeDetails> getProducer(PeakLocationFinderGlobalConfig config) {
return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
new NudgeCarLevelSchema(config),
FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
}
}


class NudgeCarLevelSchema implements SerializationSchema<NudgeDetails>
{
Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
ObjectMapper mapper = new ObjectMapper();
PeakLocationFinderGlobalConfig config;

public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
{
this.config = config;
}

@Override
public byte[] serialize(NudgeDetails element) {
byte [] bytes = null;
Document document = new Document();
document.setId(UUID.randomUUID().toString());
Metadata metadata = new Metadata();
metadata.setSchema(config.getFabricCarLevelDataStream());
metadata.setSchemaVersion(1);
metadata.setTenant(config.getTenantId());
metadata.setTimestamp(System.currentTimeMillis());
metadata.setType(Type.EVENT);
metadata.setSender("nudge");
metadata.setStream(config.getFabricCarLevelDataStream());
document.setMetadata(metadata);
document.setData(element);
try {
bytes = mapper.writeValueAsString(document).getBytes();
} catch (Exception e) {
logger.error("error while serializing nudge car level Schema");
}
return bytes;
}
}



On Mon, Sep 24, 2018 at 12:24 PM miki haiat <[hidden email]> wrote:
What are you trying to do , can you share some code ?
This is the reason for the exeption 
Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.



On Mon, 24 Sep 2018, 9:23 yuvraj singh, <[hidden email]> wrote:

Hi all , 


I am getting this error with flink 1.6.0 , please help me .






2018-09-23 07:15:08,846 ERROR org.apache.kafka.clients.producer.KafkaProducer               - Interrupted while joining ioThread

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

2018-09-23 07:15:08,847 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.

2018-09-23 07:15:08,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.

org.apache.kafka.common.KafkaException: Failed to close kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)

        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1257)

        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)

        ... 9 more


Thanks 

Yubraj Singh