Flink errors out and job fails--IOException from CollectSink.open()

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury

Has some one encountered this error …as I am using DataStream api to read from a kinesis stream .This happens intermittently and flink job dies.

 

reamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM
2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
    at java.lang.Thread.run(Thread.java:745)
2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
    at java.lang.Thread.run(Thread.java:745)

Any clue?

Much appreciation!

Thanks

Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Ted Yu
Can you give us a bit more information ?

release of flink
snippet of your code

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Ted Yu
Here is the line where NPE was thrown:

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

Looks like we should check whether mainThread is null when shutting down.

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury

In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048

 

Thanks

The error in job manager is as below

 

2017-04-14 04:26:33,570 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED.

java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)

        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)

        at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)

        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)

        at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)

        ... 12 more

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 14 more

Caused by: java.net.SocketException: Connection reset

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 25 more

2017-04-14 04:26:33,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING.

2017-04-14 04:49:10,707 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[hidden email]:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]

2017-04-14 04:49:10,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,713 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,715 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,717 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,740 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED.

java.io.IOException: Cannot connect to the client to send back the stream

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)

        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:244)

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)

        ... 6 more

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Here is the line where NPE was thrown:

 

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

 

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

 

Looks like we should check whether mainThread is null when shutting down.

 

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury

I am consistently seeing the same behavior…tried with elevated memory for job manager and taskmanager

taskmanager.rpc.port: 6123

taskmanager.data.port: 4964

taskmanager.heap.mb: 39000

taskmanager.numberOfTaskSlots: 1

taskmanager.network.numberOfBuffers: 16368

taskmanager.memory.preallocate: false

parallelization.degree.default: 4

even though the jobmanager is  restarting the flink job ,but the subtasks once reach to cancelled state does not revive

I have no clue how to approach this.

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048

 

Thanks

The error in job manager is as below

 

2017-04-14 04:26:33,570 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED.

java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)

        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)

        at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)

        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)

        at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)

        ... 12 more

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 14 more

Caused by: java.net.SocketException: Connection reset

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 25 more

2017-04-14 04:26:33,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING.

2017-04-14 04:49:10,707 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[hidden email]:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]

2017-04-14 04:49:10,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,713 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,715 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,717 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,740 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED.

java.io.IOException: Cannot connect to the client to send back the stream

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)

        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:244)

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)

        ... 6 more

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Here is the line where NPE was thrown:

 

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

 

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

 

Looks like we should check whether mainThread is null when shutting down.

 

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury

I have redone the pipeline with flink-1.3-SNAPSHOT and running on EMR 5.4, the aws-sdk-java latest libraries directly in flink lib dir.

I have come to the point where I get the java.net.SocketException: Broken pipe (Write failed). Eager to get a reply and a clue on this!

 

java.io.IOException: Error sending data back to client (ip-172-31-42-238/172.31.42.238:42753)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:679)

        at java.lang.Thread.run(Thread.java:745)

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Broken pipe (Write failed)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:202)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 6 more

Caused by: java.net.SocketException: Broken pipe (Write failed)

        at java.net.SocketOutputStream.socketWrite0(Native Method)

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 17 more

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Friday, April 14, 2017 at 1:29 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

I am consistently seeing the same behavior…tried with elevated memory for job manager and taskmanager

taskmanager.rpc.port: 6123

taskmanager.data.port: 4964

taskmanager.heap.mb: 39000

taskmanager.numberOfTaskSlots: 1

taskmanager.network.numberOfBuffers: 16368

taskmanager.memory.preallocate: false

parallelization.degree.default: 4

even though the jobmanager is  restarting the flink job ,but the subtasks once reach to cancelled state does not revive

I have no clue how to approach this.

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048

 

Thanks

The error in job manager is as below

 

2017-04-14 04:26:33,570 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED.

java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)

        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)

        at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)

        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)

        at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)

        ... 12 more

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 14 more

Caused by: java.net.SocketException: Connection reset

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 25 more

2017-04-14 04:26:33,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING.

2017-04-14 04:49:10,707 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[hidden email]:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]

2017-04-14 04:49:10,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,713 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,715 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,717 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,740 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED.

java.io.IOException: Cannot connect to the client to send back the stream

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)

        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:244)

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)

        ... 6 more

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Here is the line where NPE was thrown:

 

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

 

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

 

Looks like we should check whether mainThread is null when shutting down.

 

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Sathi Chowdhury
Ok finally able to run the job fine ...the culprit was an older version of aws inside flink kinesis connector jar , also rebuilt flink again from source with maven 3.25 ,I was on 3.3* and it had weird transitive dependency problems.




On Apr 15, 2017, at 11:13 AM, Sathi Chowdhury <[hidden email]> wrote:

I have redone the pipeline with flink-1.3-SNAPSHOT and running on EMR 5.4, the aws-sdk-java latest libraries directly in flink lib dir.

I have come to the point where I get the java.net.SocketException: Broken pipe (Write failed). Eager to get a reply and a clue on this!

 

java.io.IOException: Error sending data back to client (ip-172-31-42-238/172.31.42.238:42753)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:679)

        at java.lang.Thread.run(Thread.java:745)

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Broken pipe (Write failed)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:202)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 6 more

Caused by: java.net.SocketException: Broken pipe (Write failed)

        at java.net.SocketOutputStream.socketWrite0(Native Method)

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 17 more

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Friday, April 14, 2017 at 1:29 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

I am consistently seeing the same behavior…tried with elevated memory for job manager and taskmanager

taskmanager.rpc.port: 6123

taskmanager.data.port: 4964

taskmanager.heap.mb: 39000

taskmanager.numberOfTaskSlots: 1

taskmanager.network.numberOfBuffers: 16368

taskmanager.memory.preallocate: false

parallelization.degree.default: 4

even though the jobmanager is  restarting the flink job ,but the subtasks once reach to cancelled state does not revive

I have no clue how to approach this.

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048

 

Thanks

The error in job manager is as below

 

2017-04-14 04:26:33,570 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED.

java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)

        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)

        at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)

        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)

        at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)

        ... 12 more

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 14 more

Caused by: java.net.SocketException: Connection reset

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 25 more

2017-04-14 04:26:33,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING.

2017-04-14 04:49:10,707 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[hidden email]:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]

2017-04-14 04:49:10,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,713 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,715 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,717 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,740 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED.

java.io.IOException: Cannot connect to the client to send back the stream

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)

        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:244)

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)

        ... 6 more

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Here is the line where NPE was thrown:

 

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

 

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

 

Looks like we should check whether mainThread is null when shutting down.

 

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Ted Yu
Can you describe the transitive dependency issue in more detail (since a lot of people use 3.3.x) ?

Thanks

On Apr 16, 2017, at 1:56 AM, Sathi Chowdhury <[hidden email]> wrote:

Ok finally able to run the job fine ...the culprit was an older version of aws inside flink kinesis connector jar , also rebuilt flink again from source with maven 3.25 ,I was on 3.3* and it had weird transitive dependency problems.




On Apr 15, 2017, at 11:13 AM, Sathi Chowdhury <[hidden email]> wrote:

I have redone the pipeline with flink-1.3-SNAPSHOT and running on EMR 5.4, the aws-sdk-java latest libraries directly in flink lib dir.

I have come to the point where I get the java.net.SocketException: Broken pipe (Write failed). Eager to get a reply and a clue on this!

 

java.io.IOException: Error sending data back to client (ip-172-31-42-238/172.31.42.238:42753)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:679)

        at java.lang.Thread.run(Thread.java:745)

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Broken pipe (Write failed)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:202)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 6 more

Caused by: java.net.SocketException: Broken pipe (Write failed)

        at java.net.SocketOutputStream.socketWrite0(Native Method)

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 17 more

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Friday, April 14, 2017 at 1:29 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

I am consistently seeing the same behavior…tried with elevated memory for job manager and taskmanager

taskmanager.rpc.port: 6123

taskmanager.data.port: 4964

taskmanager.heap.mb: 39000

taskmanager.numberOfTaskSlots: 1

taskmanager.network.numberOfBuffers: 16368

taskmanager.memory.preallocate: false

parallelization.degree.default: 4

even though the jobmanager is  restarting the flink job ,but the subtasks once reach to cancelled state does not revive

I have no clue how to approach this.

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048

 

Thanks

The error in job manager is as below

 

2017-04-14 04:26:33,570 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED.

java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)

        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)

        at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)

        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)

        at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)

        ... 12 more

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 14 more

Caused by: java.net.SocketException: Connection reset

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 25 more

2017-04-14 04:26:33,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING.

2017-04-14 04:49:10,707 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[hidden email]:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]

2017-04-14 04:49:10,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,713 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,715 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,717 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,740 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED.

java.io.IOException: Cannot connect to the client to send back the stream

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)

        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:244)

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)

        ... 6 more

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Here is the line where NPE was thrown:

 

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

 

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

 

Looks like we should check whether mainThread is null when shutting down.

 

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: Flink errors out and job fails--IOException from CollectSink.open()

Tzu-Li (Gordon) Tai
Hi,

The NPE should definitely be fixed. Here’s the filed JIRA for it: https://issues.apache.org/jira/browse/FLINK-6311.

@Sathi: When building Flink we suggest to avoid 3.3.x+ as you have experienced yourself, it is subject to incorrect shading of some of the dependencies.

Cheers,
Gordon


On 16 April 2017 at 6:10:18 PM, Ted Yu ([hidden email]) wrote:

Can you describe the transitive dependency issue in more detail (since a lot of people use 3.3.x) ?

Thanks

On Apr 16, 2017, at 1:56 AM, Sathi Chowdhury <[hidden email]> wrote:

Ok finally able to run the job fine ...the culprit was an older version of aws inside flink kinesis connector jar , also rebuilt flink again from source with maven 3.25 ,I was on 3.3* and it had weird transitive dependency problems.




On Apr 15, 2017, at 11:13 AM, Sathi Chowdhury <[hidden email]> wrote:

I have redone the pipeline with flink-1.3-SNAPSHOT and running on EMR 5.4, the aws-sdk-java latest libraries directly in flink lib dir.

I have come to the point where I get the java.net.SocketException: Broken pipe (Write failed). Eager to get a reply and a clue on this!

 

java.io.IOException: Error sending data back to client (ip-172-31-42-238/172.31.42.238:42753)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:679)

        at java.lang.Thread.run(Thread.java:745)

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Broken pipe (Write failed)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:202)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 6 more

Caused by: java.net.SocketException: Broken pipe (Write failed)

        at java.net.SocketOutputStream.socketWrite0(Native Method)

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 17 more

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Friday, April 14, 2017 at 1:29 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

I am consistently seeing the same behavior…tried with elevated memory for job manager and taskmanager

taskmanager.rpc.port: 6123

taskmanager.data.port: 4964

taskmanager.heap.mb: 39000

taskmanager.numberOfTaskSlots: 1

taskmanager.network.numberOfBuffers: 16368

taskmanager.memory.preallocate: false

parallelization.degree.default: 4

even though the jobmanager is  restarting the flink job ,but the subtasks once reach to cancelled state does not revive

I have no clue how to approach this.

Thanks

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048

 

Thanks

The error in job manager is as below

 

2017-04-14 04:26:33,570 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED.

java.lang.RuntimeException: Could not forward element to next operator

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775)

        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253)

        at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234)

        at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56)

        at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213)

        at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)

        ... 12 more

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

        at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

        at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203)

        at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

        ... 14 more

Caused by: java.net.SocketException: Connection reset

        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)

        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

        at java.io.DataOutputStream.write(DataOutputStream.java:107)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 25 more

2017-04-14 04:26:33,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING.

2017-04-14 04:49:10,707 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[hidden email]:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737]

2017-04-14 04:49:10,712 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,713 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,715 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,717 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING.

2017-04-14 04:49:10,740 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED.

java.io.IOException: Cannot connect to the client to send back the stream

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80)

        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:244)

        at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75)

        ... 6 more

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 6:01 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Here is the line where NPE was thrown:

 

    mainThread.interrupt(); // the main thread may be sleeping for the discovery interval

 

I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null.

 

Looks like we should check whether mainThread is null when shutting down.

 

On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <[hidden email]> wrote:

The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar

 

red sequence number 49572261908151269541343187919820576263466496304458235906

2017-04-13 23:28:23,470 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,471 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher

java.lang.NullPointerException

        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED.

2017-04-13 23:28:23,471 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM

2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b).

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <[hidden email]>


Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Hi Ted, Sorry for my big font earlier…was not intended J

 

I am on flink 1.2.0

I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running.

Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/

 

 

From code I read a kinesis stream using

 

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.
DEFAULT_STREAM_INITIAL_POSITION, "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.
AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_RETRIES, "200");
consumerConfig.setProperty(ConsumerConfigConstants.
SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Map<String, Object>> stream = env.addSource(
new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig));

 

 

While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails

 

2017-04-14 00:31:54,672 WARN  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error while closing Kinesis data fetcher
java.lang.NullPointerException
               at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
               at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
               at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
               at java.lang.Thread.run(Thread.java:745)

 

Thanks

Sathi

 

 

 

From: Ted Yu <[hidden email]>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

Can you give us a bit more information ?

 

release of flink

snippet of your code

 

Thanks

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============