Jira issue Flink-11127

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Jira issue Flink-11127

Boris Lublinsky
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

Reply | Threaded
Open this post in threaded view
|

Starting Flink cluster and running a job

Boris Lublinsky
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 
Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Konstantin Knauf-2
In reply to this post by Boris Lublinsky
Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Konstantin Knauf-2
In reply to this post by Boris Lublinsky
Hi Boris,

without looking at the entrypoint in much detail, generally there should not be a race condition there:

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think.

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers,

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


--

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Boris Lublinsky
Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

without looking at the entrypoint in much detail, generally there should not be a race condition there:

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think.

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers,

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Boris Lublinsky
In reply to this post by Konstantin Knauf-2
Thanks Konstantin

Unfortunately it does not work

The snippet from task manager yaml is

containers:
- name: taskmanager
image: {{ .Values.image }}:{{ .Values.imageTag }}
imagePullPolicy: {{ .Values.imagePullPolicy }}
args:
- taskmanager -Dtaskmanager.host=$(K8S_POD_IP)
ports:
- name: data
containerPort: 6121
- name: rpc
containerPort: 6122
- name: query
containerPort: 6125
env:
- name: FLINK_CONF_DIR
value: /etc/flink
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
resources:



The error is
/docker-entrypoint.sh: 62: exec: taskmanager -Dtaskmanager.host=10.131.0.97: not found


Did I misunderstood your instructions?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Ken Krugler
In reply to this post by Boris Lublinsky
Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

without looking at the entrypoint in much detail, generally there should not be a race condition there:

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think.

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers,

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


--
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Boris Lublinsky
Thanks Ken,
That was my first instinct as well, but..
To run on the cluster I am building an uber jar for which I am fixing Kafka clients jar version
I am also fixing version of Kafka
So I do not know where another version can get from


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Boris Lublinsky
In reply to this post by Konstantin Knauf-2
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [<a href="akka.tcp://flink-metrics@flink-taskmanager-1:6170" class="">akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now gated for [50] ms. Reason: [Association failed with [<a href="akka.tcp://flink-metrics@flink-taskmanager-1:6170" class="">akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: [flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Boris Lublinsky
In reply to this post by Ken Krugler
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Konstantin Knauf-2
Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try without fixing the Kafka version, i.e. running with the Kafka client version provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <[hidden email]> wrote:
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Konstantin Knauf-2
In reply to this post by Boris Lublinsky
Hi Boris,

the exact command depends on the docker-entrypoint.sh script and the image you are using. For the example contained in the Flink repository it is "task-manager", I think. The important thing is to pass "taskmanager.host" to the Taskmanager process. You can verify by checking the Taskmanager logs. These should contain lines like below:

2019-02-21 08:03:00,004 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2019-02-21 08:03:00,008 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dtaskmanager.host=10.12.10.173

In the Jobmanager logs you should see that the Taskmanager is registered under the IP above in a line similar to:

2019-02-21 08:03:26,874 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0) at ResourceManager

A service per Taskmanager is not required. The purpose of the config parameter is that the Jobmanager addresses the taskmanagers by IP instead of hostname.

Hope this helps!

Cheers,

Konstantin



On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <[hidden email]> wrote:
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: [flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Boris Lublinsky
In reply to this post by Konstantin Knauf-2
The relevant dependencies are 
val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % flinkVersion % "provided"
val flinkStreamingScala = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion exclude("org.slf4j", "slf4j-log4j12")
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result 


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try without fixing the Kafka version, i.e. running with the Kafka client version provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <[hidden email]> wrote:
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Boris Lublinsky
In reply to this post by Konstantin Knauf-2
Konstantin, it still does not quite work
The IP is still in place, but…

Here is Job manager log
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
Starting Job Manager
config file: 
jobmanager.rest.address: crabby-kudu-fdp-flink-jobmanager-service
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
rest.port: 8081
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
blob.server.port: 6124
query.server.port: 6125
Starting standalonesession as a console application on host crabby-kudu-fdp-flink-jobmanager-85c8d799db-46rj2.
2019-02-21 21:00:37,803 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2019-02-21 21:00:37,804 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: 1.7.1, Rev:89eafb4, Date:14.12.2018 @ 15:48:34 GMT)
2019-02-21 21:00:37,804 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: ?
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /docker-java-home/jre
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2019-02-21 21:00:37,805 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink/conf
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink/lib/flink-metrics-prometheus-1.7.1.jar:/opt/flink/lib/flink-python_2.11-1.7.1.jar:/opt/flink/lib/flink-queryable-state-runtime_2.11-1.7.1.jar:/opt/flink/lib/flink-table_2.11-1.7.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.1.jar:::
2019-02-21 21:00:37,806 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2019-02-21 21:00:37,808 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2019-02-21 21:00:37,822 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rest.address, crabby-kudu-fdp-flink-jobmanager-service
2019-02-21 21:00:37,822 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-02-21 21:00:37,823 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-02-21 21:00:37,823 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-02-21 21:00:37,823 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-02-21 21:00:37,823 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-02-21 21:00:37,824 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-02-21 21:00:37,824 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporters, prom
2019-02-21 21:00:37,825 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter
2019-02-21 21:00:37,825 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.prom.port, 9249
2019-02-21 21:00:37,825 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2019-02-21 21:00:37,825 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2019-02-21 21:00:38,010 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2019-02-21 21:00:38,011 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2019-02-21 21:00:38,016 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2019-02-21 21:00:38,023 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2019-02-21 21:00:38,031 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2019-02-21 21:00:38,043 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2019-02-21 21:00:38,044 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2019-02-21 21:00:38,513 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at 127.0.0.1:6123
2019-02-21 21:00:39,304 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2019-02-21 21:00:39,411 INFO  akka.remote.Remoting                                          - Starting remoting
2019-02-21 21:00:39,570 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:00:39,602 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at <a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123
2019-02-21 21:00:39,617 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2019-02-21 21:00:39,626 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-12db5847-9543-43ad-a7fa-19de8e907ed6
2019-02-21 21:00:39,629 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:6124 - max concurrent requests: 50 - max backlog: 1000
2019-02-21 21:00:39,649 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Configuring prom with {port=9249, class=org.apache.flink.metrics.prometheus.PrometheusReporter}.
2019-02-21 21:00:39,658 INFO  org.apache.flink.metrics.prometheus.PrometheusReporter        - Started PrometheusReporter HTTP server on port 9249.
2019-02-21 21:00:39,658 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Reporting metrics for reporter prom of type org.apache.flink.metrics.prometheus.PrometheusReporter.
2019-02-21 21:00:39,659 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at 127.0.0.1:0
2019-02-21 21:00:39,714 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2019-02-21 21:00:39,720 INFO  akka.remote.Remoting                                          - Starting remoting
2019-02-21 21:00:39,727 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[<a href="akka.tcp://flink-metrics@127.0.0.1:34006" class="">akka.tcp://flink-metrics@127.0.0.1:34006]
2019-02-21 21:00:39,728 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at <a href="akka.tcp://flink-metrics@127.0.0.1:34006" class="">akka.tcp://flink-metrics@127.0.0.1:34006
2019-02-21 21:00:39,797 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-757ae8c1-c839-4666-9d27-697c34214187, expiration time 3600000, maximum cache size 52428800 bytes.
2019-02-21 21:00:39,821 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-71959baf-25bb-4182-864a-5f4873ea9988
2019-02-21 21:00:39,838 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2019-02-21 21:00:39,839 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-8dfc9112-0fc2-439f-aac5-2bbe5a003835/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2019-02-21 21:00:39,840 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-8dfc9112-0fc2-439f-aac5-2bbe5a003835/flink-web-upload for file uploads.
2019-02-21 21:00:39,896 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2019-02-21 21:00:40,611 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
2019-02-21 21:00:40,611 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.
2019-02-21 21:00:41,098 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:00:41,301 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at 127.0.0.1:8081
2019-02-21 21:00:41,301 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://127.0.0.1:8081  was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2019-02-21 21:00:41,301 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://127.0.0.1:8081 .
2019-02-21 21:00:41,598 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at <a href="akka://flink/user/resourcemanager" class="">akka://flink/user/resourcemanager .
2019-02-21 21:00:41,616 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at <a href="akka://flink/user/dispatcher" class="">akka://flink/user/dispatcher .
2019-02-21 21:00:41,711 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager <a href="akka.tcp://flink@127.0.0.1:6123/user/resourcemanager" class="">akka.tcp://flink@127.0.0.1:6123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2019-02-21 21:00:41,712 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2019-02-21 21:00:41,807 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher <a href="akka.tcp://flink@127.0.0.1:6123/user/dispatcher" class="">akka.tcp://flink@127.0.0.1:6123/user/dispatcher was granted leadership with fencing token 00000000-0000-0000-0000-000000000000
2019-02-21 21:00:41,898 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2019-02-21 21:00:44,420 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:01:00,434 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:01:04,353 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:01:20,474 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:01:24,393 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:01:40,514 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:01:44,433 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:02:00,554 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 21:02:04,473 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/]] arriving at [<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123] 
inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]

And here is task manager

metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
Starting Task Manager
taskmanager.host : 10.131.2.148
config file: 
jobmanager.rpc.address: crabby-kudu-fdp-flink-jobmanager-service
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 16
parallelism.default: 1
rest.port: 8081
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
taskmanager.host : 10.131.2.148
blob.server.port: 6124
query.server.port: 6125
Starting taskexecutor as a console application on host crabby-kudu-fdp-flink-taskmanager-9f548f744-xlfqg.
2019-02-21 21:00:38,013 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - --------------------------------------------------------------------------------
2019-02-21 21:00:38,014 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Starting TaskManager (Version: 1.7.1, Rev:89eafb4, Date:14.12.2018 @ 15:48:34 GMT)
2019-02-21 21:00:38,014 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  OS current user: ?
2019-02-21 21:00:38,014 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Maximum heap size: 922 MiBytes
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  JAVA_HOME: /docker-java-home/jre
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  No Hadoop Dependency available
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  JVM Options:
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -XX:+UseG1GC
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Xms922M
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Xmx922M
2019-02-21 21:00:38,015 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -XX:MaxDirectMemorySize=8388607T
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Program Arguments:
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     --configDir
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -     /opt/flink/conf
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       -  Classpath: /opt/flink/lib/flink-metrics-prometheus-1.7.1.jar:/opt/flink/lib/flink-python_2.11-1.7.1.jar:/opt/flink/lib/flink-queryable-state-runtime_2.11-1.7.1.jar:/opt/flink/lib/flink-table_2.11-1.7.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.1.jar:::
2019-02-21 21:00:38,016 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - --------------------------------------------------------------------------------
2019-02-21 21:00:38,018 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Registered UNIX signal handlers for [TERM, HUP, INT]
2019-02-21 21:00:38,021 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Maximum number of open file descriptors is 1048576.
2019-02-21 21:00:38,032 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, crabby-kudu-fdp-flink-jobmanager-service
2019-02-21 21:00:38,032 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-02-21 21:00:38,032 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-02-21 21:00:38,032 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-02-21 21:00:38,033 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 16
2019-02-21 21:00:38,033 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-02-21 21:00:38,033 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-02-21 21:00:38,034 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporters, prom
2019-02-21 21:00:38,034 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter
2019-02-21 21:00:38,035 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.prom.port, 9249
2019-02-21 21:00:38,035 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.host, 10.131.2.148
2019-02-21 21:00:38,035 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2019-02-21 21:00:38,035 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2019-02-21 21:00:38,041 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2019-02-21 21:00:38,060 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2019-02-21 21:00:38,082 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2019-02-21 21:00:43,278 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2019-02-21 21:00:43,281 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Using configured hostname/address for TaskManager: 10.131.2.148.
2019-02-21 21:00:43,283 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at 10.131.2.148:0
2019-02-21 21:00:43,686 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2019-02-21 21:00:43,736 INFO  akka.remote.Remoting                                          - Starting remoting
2019-02-21 21:00:43,850 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[<a href="akka.tcp://flink@10.131.2.148:38454" class="">akka.tcp://flink@10.131.2.148:38454]
2019-02-21 21:00:43,857 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at <a href="akka.tcp://flink@10.131.2.148:38454" class="">akka.tcp://flink@10.131.2.148:38454
2019-02-21 21:00:43,864 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Trying to start actor system at 10.131.2.148:0
2019-02-21 21:00:43,881 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2019-02-21 21:00:43,888 INFO  akka.remote.Remoting                                          - Starting remoting
2019-02-21 21:00:43,897 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[<a href="akka.tcp://flink-metrics@10.131.2.148:34162" class="">akka.tcp://flink-metrics@10.131.2.148:34162]
2019-02-21 21:00:43,898 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Actor system started at <a href="akka.tcp://flink-metrics@10.131.2.148:34162" class="">akka.tcp://flink-metrics@10.131.2.148:34162
2019-02-21 21:00:43,916 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Configuring prom with {port=9249, class=org.apache.flink.metrics.prometheus.PrometheusReporter}.
2019-02-21 21:00:43,925 INFO  org.apache.flink.metrics.prometheus.PrometheusReporter        - Started PrometheusReporter HTTP server on port 9249.
2019-02-21 21:00:43,926 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Reporting metrics for reporter prom of type org.apache.flink.metrics.prometheus.PrometheusReporter.
2019-02-21 21:00:43,932 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Created BLOB cache storage directory /tmp/blobStore-da779bfd-52ab-4e50-ae69-37cc363f0880
2019-02-21 21:00:43,934 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-9f8aacaf-dede-45c6-9dba-34969b4adcba
2019-02-21 21:00:43,935 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Starting TaskManager with ResourceID: 24acb543dbb8a7dd0b3f4f92bce93a8f
2019-02-21 21:00:43,939 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: /10.131.2.148, server port: 0, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 16 (manual), number of client threads: 16 (manual), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
2019-02-21 21:00:43,978 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory '/tmp': total 79 GB, usable 19 GB (24.05% usable)
2019-02-21 21:00:44,050 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 102 MB for network buffer pool (number of memory segments: 3278, bytes per segment: 32768).
2019-02-21 21:00:44,105 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the network environment and its components.
2019-02-21 21:00:44,141 INFO  org.apache.flink.runtime.io.network.netty.NettyClient         - Successful initialization (took 34 ms).
2019-02-21 21:00:44,187 INFO  org.apache.flink.runtime.io.network.netty.NettyServer         - Successful initialization (took 46 ms). Listening on SocketAddress /10.131.2.148:46191.
2019-02-21 21:00:44,194 INFO  org.apache.flink.queryablestate.server.KvStateServerImpl      - Started Queryable State Server @ /10.131.2.148:9067.
2019-02-21 21:00:44,206 INFO  org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  - Started Queryable State Proxy Server @ /10.131.2.148:9069.
2019-02-21 21:00:44,207 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting managed memory to 0.7 of the currently free heap space (639 MB), memory will be allocated lazily.
2019-02-21 21:00:44,210 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /tmp/flink-io-d1a33d1b-838f-4082-86b7-1ade59bdda8a for spill files.
2019-02-21 21:00:44,280 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2019-02-21 21:00:44,291 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at <a href="akka://flink/user/taskmanager_0" class="">akka://flink/user/taskmanager_0 .
2019-02-21 21:00:44,305 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Start job leader service.
2019-02-21 21:00:44,305 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager(00000000000000000000000000000000)" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager(00000000000000000000000000000000).
2019-02-21 21:00:44,306 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /tmp/flink-dist-cache-807b9b28-6656-4bf9-b5ee-4ce41f3b4513
2019-02-21 21:00:54,330 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:01:14,370 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:01:34,409 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:01:54,449 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:02:14,490 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:02:34,529 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:02:54,569 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:03:14,610 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-21 21:03:34,649 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/" class="">akka.tcp://flink@crabby-kudu-fdp-flink-jobmanager-service:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..

Something is still not connected

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the exact command depends on the docker-entrypoint.sh script and the image you are using. For the example contained in the Flink repository it is "task-manager", I think. The important thing is to pass "taskmanager.host" to the Taskmanager process. You can verify by checking the Taskmanager logs. These should contain lines like below:

2019-02-21 08:03:00,004 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2019-02-21 08:03:00,008 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dtaskmanager.host=10.12.10.173

In the Jobmanager logs you should see that the Taskmanager is registered under the IP above in a line similar to:

2019-02-21 08:03:26,874 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0) at ResourceManager

A service per Taskmanager is not required. The purpose of the config parameter is that the Jobmanager addresses the taskmanagers by IP instead of hostname.

Hope this helps!

Cheers,

Konstantin



On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <[hidden email]> wrote:
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: [flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Boris Lublinsky
In reply to this post by Konstantin Knauf-2

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the exact command depends on the docker-entrypoint.sh script and the image you are using. For the example contained in the Flink repository it is "task-manager", I think. The important thing is to pass "taskmanager.host" to the Taskmanager process. You can verify by checking the Taskmanager logs. These should contain lines like below:

2019-02-21 08:03:00,004 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2019-02-21 08:03:00,008 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dtaskmanager.host=10.12.10.173

In the Jobmanager logs you should see that the Taskmanager is registered under the IP above in a line similar to:

2019-02-21 08:03:26,874 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0) at ResourceManager

A service per Taskmanager is not required. The purpose of the config parameter is that the Jobmanager addresses the taskmanagers by IP instead of hostname.

Hope this helps!

Cheers,

Konstantin



On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <[hidden email]> wrote:
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: [flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Boris Lublinsky
Adding metric-query port makes it a bit better, but there is still an error


019-02-22 00:03:56,173 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-22 00:04:16,213 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-22 00:04:36,253 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-22 00:04:56,293 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address <a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify”..

In the task manager and

2019-02-21 23:59:46,479 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-21 23:59:57,808 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:06,519 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:17,849 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:26,558 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:37,888 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [<a href="akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123" class="">akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [<a href="akka.tcp://flink@127.0.0.1:6123" class="">akka.tcp://flink@127.0.0.1:6123]

I the job manager

Port 6123 is opened in both Job Manager deployment

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: {{ template "fullname" . }}-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      annotations:
        prometheus.io/scrape: 'true'
        prometheus.io/port: '9249'
      labels:
        server: flink
        app: {{ template "fullname" . }}
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: {{ .Values.image }}:{{ .Values.imageTag }}
        imagePullPolicy: {{ .Values.imagePullPolicy }}
        args:
        - jobmanager
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        env:
        - name: CONTAINER_METRIC_PORT
          value: '{{ .Values.flink.metric_query_port }}'
        - name: JOB_MANAGER_RPC_ADDRESS
          value : {{ template "fullname" . }}-jobmanager
        livenessProbe:
          httpGet:
            path: /overview
            port: 8081
          initialDelaySeconds: 30
          periodSeconds: 10
        resources:
          limits:
            cpu: {{ .Values.resources.jobmanager.limits.cpu }}
            memory: {{ .Values.resources.jobmanager.limits.memory }}
          requests:
            cpu: {{ .Values.resources.jobmanager.requests.cpu }}
            memory: {{ .Values.resources.jobmanager.requests.memory }}

And Job manager service

apiVersion: v1
kind: Service
metadata:
  name: {{ template "fullname" . }}-jobmanager
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: {{ template "fullname" . }}
    component: jobmanager




Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 6:13 PM, Boris Lublinsky <[hidden email]> wrote:


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the exact command depends on the docker-entrypoint.sh script and the image you are using. For the example contained in the Flink repository it is "task-manager", I think. The important thing is to pass "taskmanager.host" to the Taskmanager process. You can verify by checking the Taskmanager logs. These should contain lines like below:

2019-02-21 08:03:00,004 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2019-02-21 08:03:00,008 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dtaskmanager.host=10.12.10.173

In the Jobmanager logs you should see that the Taskmanager is registered under the IP above in a line similar to:

2019-02-21 08:03:26,874 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0) at ResourceManager

A service per Taskmanager is not required. The purpose of the config parameter is that the Jobmanager addresses the taskmanagers by IP instead of hostname.

Hope this helps!

Cheers,

Konstantin



On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <[hidden email]> wrote:
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: [flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Boris Lublinsky
In reply to this post by Boris Lublinsky
Gordon, I double checked it several times. Here is the list

Columns are
[info]  - Jar-Size including dependencies
[info]  - Jar-Size
[info]  - Number of transitive dependencies
[info]  - Number of direct dependencies
[info]  - ModuleID
[info] Done updating.
[info]    TotSize    JarSize #TDe #Dep Module
[info]  27.988 MB ------- MB   33    2 lightbend:fdp-flink-taxiride_2.11:2.0.0
[info]  19.632 MB ------- MB   24    6 lightbend:support_2.11:2.0.0
[info]  17.675 MB   0.000 MB   16    9 org.typelevel:cats_2.11:0.9.0
[info]  17.389 MB   1.182 MB   13    9 org.typelevel:cats-laws_2.11:0.9.0
[info]  13.115 MB   0.286 MB    7    4 org.typelevel:cats-free_2.11:0.9.0
[info]  12.830 MB   0.000 MB    7    4 org.typelevel:cats-jvm_2.11:0.9.0
[info]  12.830 MB   3.403 MB    6    4 org.typelevel:cats-core_2.11:0.9.0
[info]   8.397 MB   0.068 MB    8    5 org.apache.flink:flink-connector-kafka_2.11:1.7.1
[info]   8.175 MB   1.889 MB    4    4 org.apache.kafka:kafka-clients:2.1.0
[info]   7.973 MB   0.212 MB    7    4 org.typelevel:cats-kernel-laws_2.11:0.9.0
[info]   4.834 MB   0.007 MB    4    2 org.typelevel:cats-macros_2.11:0.9.0
[info]   4.746 MB   0.082 MB    2    2 com.typesafe.scala-logging:scala-logging_2.11:3.9.0
[info]   4.658 MB   0.035 MB    1    1 org.typelevel:machinist_2.11:0.6.1
[info]   4.623 MB   4.623 MB    0    0 org.scala-lang:scala-reflect:2.11.12
[info]   4.592 MB   4.592 MB    0    0 org.typelevel:cats-kernel_2.11:0.9.0
[info]   3.714 MB   3.714 MB    0    0 com.github.luben:zstd-jni:1.3.5-4
[info]   3.152 MB   0.043 MB    2    1 org.typelevel:discipline_2.11:0.7.2
[info]   3.109 MB   3.094 MB    1    1 org.scalacheck:scalacheck_2.11:1.13.4
[info]   2.019 MB   2.019 MB    0    0 org.xerial.snappy:snappy-java:1.1.7.2
[info]   0.803 MB   0.290 MB    2    2 ch.qos.logback:logback-classic:1.2.3
[info]   0.641 MB   0.641 MB    0    0 joda-time:joda-time:2.10.1
[info]   0.512 MB   0.512 MB    0    0 org.lz4:lz4-java:1.5.0
[info]   0.472 MB   0.472 MB    0    0 ch.qos.logback:logback-core:1.2.3
[info]   0.286 MB   0.286 MB    0    0 com.typesafe:config:1.3.3
[info]   0.195 MB   0.114 MB    3    3 org.apache.flink:flink-connector-kafka-base_2.11:1.7.1
[info]   0.170 MB   0.167 MB    1    1 com.github.mpilquist:simulacrum_2.11:0.10.0
[info]   0.145 MB   0.145 MB    0    0 org.joda:joda-convert:2.1.2
[info]   0.041 MB   0.041 MB    0    0 org.slf4j:slf4j-api:1.7.25
[info]   0.033 MB   0.033 MB    0    0 com.google.code.findbugs:jsr305:1.3.9
[info]   0.016 MB   0.002 MB    2    1 org.typelevel:catalysts-platform_2.11:0.0.5
[info]   0.015 MB   0.012 MB    1    1 org.typelevel:catalysts-macros_2.11:0.0.5
[info]   0.015 MB   0.015 MB    0    0 org.scala-sbt:test-interface:1.0
[info]   0.007 MB   0.007 MB    0    0 org.apache.flink:force-shading:1.7.1
[info]   0.003 MB   0.003 MB    0    0 org.typelevel:macro-compat_2.11:1.1.1


And here is another view

[info] lightbend:fdp-flink-taxiride_2.11:2.0.0 [S]
[info]   +-lightbend:support_2.11:2.0.0 [S]
[info]   | +-ch.qos.logback:logback-classic:1.2.3
[info]   | | +-ch.qos.logback:logback-core:1.2.3
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-com.typesafe.scala-logging:scala-logging_2.11:3.9.0 [S]
[info]   | | +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-com.typesafe:config:1.3.3
[info]   | +-joda-time:joda-time:2.10.1
[info]   | +-org.joda:joda-convert:2.1.2
[info]   | +-org.typelevel:cats_2.11:0.9.0 [S]
[info]   |   +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | 
[info]   |   +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-free_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-jvm_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]   |   | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | 
[info]   |   | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   |     +-org.scala-sbt:test-interface:1.0
[info]   |   |     
[info]   |   +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   +-org.typelevel:cats-laws_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | 
[info]   |   | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]   |   | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | | 
[info]   |   | | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   | |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | |     +-org.scala-sbt:test-interface:1.0
[info]   |   | |     
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | |   +-org.scala-sbt:test-interface:1.0
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |     +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |     
[info]   +-org.apache.flink:flink-connector-kafka_2.11:1.7.1
[info]     +-com.google.code.findbugs:jsr305:1.3.9
[info]     +-org.apache.flink:flink-connector-kafka-base_2.11:1.7.1
[info]     | +-com.google.code.findbugs:jsr305:1.3.9
[info]     | +-org.apache.flink:force-shading:1.7.1
[info]     | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     | +-org.slf4j:slf4j-api:1.7.25
[info]     | 
[info]     +-org.apache.flink:force-shading:1.7.1
[info]     +-org.apache.kafka:kafka-clients:2.1.0
[info]     | +-com.github.luben:zstd-jni:1.3.5-4
[info]     | +-org.lz4:lz4-java:1.5.0
[info]     | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     | +-org.slf4j:slf4j-api:1.7.25
[info]     | +-org.xerial.snappy:snappy-java:1.1.7.2
[info]     | 
[info]     +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     +-org.slf4j:slf4j-api:1.7.25
[info]     
[info] Done updating.
[info] lightbend:support_2.11:2.0.0 [S]
[info]   +-ch.qos.logback:logback-classic:1.2.3
[info]   | +-ch.qos.logback:logback-core:1.2.3
[info]   | +-org.slf4j:slf4j-api:1.7.25
[info]   | 
[info]   +-com.typesafe.scala-logging:scala-logging_2.11:3.9.0 [S]
[info]   | +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   | +-org.slf4j:slf4j-api:1.7.25
[info]   | 
[info]   +-com.typesafe:config:1.3.3
[info]   +-joda-time:joda-time:2.10.1
[info]   +-org.joda:joda-convert:2.1.2
[info]   +-org.typelevel:cats_2.11:0.9.0 [S]
[info]     +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | 
[info]     +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-free_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-jvm_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]     | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | +-org.scala-sbt:test-interface:1.0
[info]     | | 
[info]     | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     |     +-org.scala-sbt:test-interface:1.0
[info]     |     
[info]     +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     +-org.typelevel:cats-laws_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | +-org.scala-sbt:test-interface:1.0
[info]     | | 
[info]     | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]     | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | | +-org.scala-sbt:test-interface:1.0
[info]     | | | 
[info]     | | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | |   
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     | |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | |     +-org.scala-sbt:test-interface:1.0
[info]     | |     
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | |   +-org.scala-sbt:test-interface:1.0
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]       +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]       





Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 22, 2019, at 12:33 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

I haven't seen this problem for Flink 1.6.x / 1.7.x, so it shouldn't be a reoccurrence of FLINK-8741.
I've double checked the used classloaders in the Kafka connector, they seem to be correct.

The fact that it works correctly in IntelliJ, but not when packaged, suggests that there could be some conflicting dependencies in the packaged jar.

Could you check the actual resolved dependency tree of the project, and see if there are multiple versions of some dependency related to Kafka being pulled in? For Maven for example, that would be "mvn dependency:tree".

Cheers,
Gordon

On Thu, Feb 21, 2019 at 10:49 PM Boris Lublinsky <[hidden email]> wrote:
The relevant dependencies are 
val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % flinkVersion % "provided"
val flinkStreamingScala = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion exclude("org.slf4j", "slf4j-log4j12")
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result 


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try without fixing the Kafka version, i.e. running with the Kafka client version provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <[hidden email]> wrote:
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Dawid Wysakowicz-2

Hi,

One additional question, how do you actually build the docker image? How do you put the user jar into the container? Maybe you added extra kafka connector to the cluster classpath? Have you checked what is on the classpath of a running taskmanager?

Best,

Dawid

On 22/02/2019 15:44, Boris Lublinsky wrote:
Gordon, I double checked it several times. Here is the list

Columns are
[info]  - Jar-Size including dependencies
[info]  - Jar-Size
[info]  - Number of transitive dependencies
[info]  - Number of direct dependencies
[info]  - ModuleID
[info] Done updating.
[info]    TotSize    JarSize #TDe #Dep Module
[info]  27.988 MB ------- MB   33    2 lightbend:fdp-flink-taxiride_2.11:2.0.0
[info]  19.632 MB ------- MB   24    6 lightbend:support_2.11:2.0.0
[info]  17.675 MB   0.000 MB   16    9 org.typelevel:cats_2.11:0.9.0
[info]  17.389 MB   1.182 MB   13    9 org.typelevel:cats-laws_2.11:0.9.0
[info]  13.115 MB   0.286 MB    7    4 org.typelevel:cats-free_2.11:0.9.0
[info]  12.830 MB   0.000 MB    7    4 org.typelevel:cats-jvm_2.11:0.9.0
[info]  12.830 MB   3.403 MB    6    4 org.typelevel:cats-core_2.11:0.9.0
[info]   8.397 MB   0.068 MB    8    5 org.apache.flink:flink-connector-kafka_2.11:1.7.1
[info]   8.175 MB   1.889 MB    4    4 org.apache.kafka:kafka-clients:2.1.0
[info]   7.973 MB   0.212 MB    7    4 org.typelevel:cats-kernel-laws_2.11:0.9.0
[info]   4.834 MB   0.007 MB    4    2 org.typelevel:cats-macros_2.11:0.9.0
[info]   4.746 MB   0.082 MB    2    2 com.typesafe.scala-logging:scala-logging_2.11:3.9.0
[info]   4.658 MB   0.035 MB    1    1 org.typelevel:machinist_2.11:0.6.1
[info]   4.623 MB   4.623 MB    0    0 org.scala-lang:scala-reflect:2.11.12
[info]   4.592 MB   4.592 MB    0    0 org.typelevel:cats-kernel_2.11:0.9.0
[info]   3.714 MB   3.714 MB    0    0 com.github.luben:zstd-jni:1.3.5-4
[info]   3.152 MB   0.043 MB    2    1 org.typelevel:discipline_2.11:0.7.2
[info]   3.109 MB   3.094 MB    1    1 org.scalacheck:scalacheck_2.11:1.13.4
[info]   2.019 MB   2.019 MB    0    0 org.xerial.snappy:snappy-java:1.1.7.2
[info]   0.803 MB   0.290 MB    2    2 ch.qos.logback:logback-classic:1.2.3
[info]   0.641 MB   0.641 MB    0    0 joda-time:joda-time:2.10.1
[info]   0.512 MB   0.512 MB    0    0 org.lz4:lz4-java:1.5.0
[info]   0.472 MB   0.472 MB    0    0 ch.qos.logback:logback-core:1.2.3
[info]   0.286 MB   0.286 MB    0    0 com.typesafe:config:1.3.3
[info]   0.195 MB   0.114 MB    3    3 org.apache.flink:flink-connector-kafka-base_2.11:1.7.1
[info]   0.170 MB   0.167 MB    1    1 com.github.mpilquist:simulacrum_2.11:0.10.0
[info]   0.145 MB   0.145 MB    0    0 org.joda:joda-convert:2.1.2
[info]   0.041 MB   0.041 MB    0    0 org.slf4j:slf4j-api:1.7.25
[info]   0.033 MB   0.033 MB    0    0 com.google.code.findbugs:jsr305:1.3.9
[info]   0.016 MB   0.002 MB    2    1 org.typelevel:catalysts-platform_2.11:0.0.5
[info]   0.015 MB   0.012 MB    1    1 org.typelevel:catalysts-macros_2.11:0.0.5
[info]   0.015 MB   0.015 MB    0    0 org.scala-sbt:test-interface:1.0
[info]   0.007 MB   0.007 MB    0    0 org.apache.flink:force-shading:1.7.1
[info]   0.003 MB   0.003 MB    0    0 org.typelevel:macro-compat_2.11:1.1.1


And here is another view

[info] lightbend:fdp-flink-taxiride_2.11:2.0.0 [S]
[info]   +-lightbend:support_2.11:2.0.0 [S]
[info]   | +-ch.qos.logback:logback-classic:1.2.3
[info]   | | +-ch.qos.logback:logback-core:1.2.3
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-com.typesafe.scala-logging:scala-logging_2.11:3.9.0 [S]
[info]   | | +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-com.typesafe:config:1.3.3
[info]   | +-joda-time:joda-time:2.10.1
[info]   | +-org.joda:joda-convert:2.1.2
[info]   | +-org.typelevel:cats_2.11:0.9.0 [S]
[info]   |   +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | 
[info]   |   +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-free_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-jvm_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]   |   | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | 
[info]   |   | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   |     +-org.scala-sbt:test-interface:1.0
[info]   |   |     
[info]   |   +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   +-org.typelevel:cats-laws_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | 
[info]   |   | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]   |   | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | | 
[info]   |   | | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   | |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | |     +-org.scala-sbt:test-interface:1.0
[info]   |   | |     
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | |   +-org.scala-sbt:test-interface:1.0
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |     +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |     
[info]   +-org.apache.flink:flink-connector-kafka_2.11:1.7.1
[info]     +-com.google.code.findbugs:jsr305:1.3.9
[info]     +-org.apache.flink:flink-connector-kafka-base_2.11:1.7.1
[info]     | +-com.google.code.findbugs:jsr305:1.3.9
[info]     | +-org.apache.flink:force-shading:1.7.1
[info]     | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     | +-org.slf4j:slf4j-api:1.7.25
[info]     | 
[info]     +-org.apache.flink:force-shading:1.7.1
[info]     +-org.apache.kafka:kafka-clients:2.1.0
[info]     | +-com.github.luben:zstd-jni:1.3.5-4
[info]     | +-org.lz4:lz4-java:1.5.0
[info]     | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     | +-org.slf4j:slf4j-api:1.7.25
[info]     | +-org.xerial.snappy:snappy-java:1.1.7.2
[info]     | 
[info]     +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     +-org.slf4j:slf4j-api:1.7.25
[info]     
[info] Done updating.
[info] lightbend:support_2.11:2.0.0 [S]
[info]   +-ch.qos.logback:logback-classic:1.2.3
[info]   | +-ch.qos.logback:logback-core:1.2.3
[info]   | +-org.slf4j:slf4j-api:1.7.25
[info]   | 
[info]   +-com.typesafe.scala-logging:scala-logging_2.11:3.9.0 [S]
[info]   | +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   | +-org.slf4j:slf4j-api:1.7.25
[info]   | 
[info]   +-com.typesafe:config:1.3.3
[info]   +-joda-time:joda-time:2.10.1
[info]   +-org.joda:joda-convert:2.1.2
[info]   +-org.typelevel:cats_2.11:0.9.0 [S]
[info]     +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | 
[info]     +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-free_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-jvm_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]     | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | +-org.scala-sbt:test-interface:1.0
[info]     | | 
[info]     | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     |     +-org.scala-sbt:test-interface:1.0
[info]     |     
[info]     +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     +-org.typelevel:cats-laws_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | +-org.scala-sbt:test-interface:1.0
[info]     | | 
[info]     | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]     | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | | +-org.scala-sbt:test-interface:1.0
[info]     | | | 
[info]     | | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | |   
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     | |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | |     +-org.scala-sbt:test-interface:1.0
[info]     | |     
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | |   +-org.scala-sbt:test-interface:1.0
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]       +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]       





Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 22, 2019, at 12:33 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

I haven't seen this problem for Flink 1.6.x / 1.7.x, so it shouldn't be a reoccurrence of FLINK-8741.
I've double checked the used classloaders in the Kafka connector, they seem to be correct.

The fact that it works correctly in IntelliJ, but not when packaged, suggests that there could be some conflicting dependencies in the packaged jar.

Could you check the actual resolved dependency tree of the project, and see if there are multiple versions of some dependency related to Kafka being pulled in? For Maven for example, that would be "mvn dependency:tree".

Cheers,
Gordon

On Thu, Feb 21, 2019 at 10:49 PM Boris Lublinsky <[hidden email]> wrote:
The relevant dependencies are 
val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % flinkVersion % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % flinkVersion % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka"          % flinkVersion exclude("org.slf4j", "slf4j-log4j12") 
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result 


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try without fixing the Kafka version, i.e. running with the Kafka client version provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <[hidden email]> wrote:
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
#   from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
#   and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
    if [ $(id -u) != 0 ]; then
        # Don't need to drop privs if EUID != 0
        return
    elif [ -x /sbin/su-exec ]; then
        # Alpine
        echo su-exec flink
    else
        # Others
        echo gosu flink
    fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
    echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
    exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
    if [ "${CMD}" = "${TASK_MANAGER}" ]; then
        TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

        sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
        sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
        echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
        echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

        echo "Starting Task Manager"
        echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
        exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
    else
        sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
        echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
        echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
        echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

        if [ -z "$1" ]; then
           exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
        else
            exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
        fi
    fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Starting Flink cluster and running a job

Boris Lublinsky
I am doing this using sbt native packager and sbt docker.


ef sbtdockerFlinkAppBase(id: String)(base: String = id) = projectBase(id)(base)
.enablePlugins(sbtdocker.DockerPlugin)
.settings(
dockerfile in docker := {

val artifact: File = assembly.value
// This location is fixed. the jar has to go there so that Flink can load it
val artifactTargetPath = s"/opt/flink/lib/${artifact.name}"

new Dockerfile {
from ("lightbend/flink:1.7.1-scala_2.11")
add(artifact, artifactTargetPath)
}
},

// Set name for the image
imageNames in docker := Seq(
ImageName(namespace = Some(organization.value),
repository = name.value.toLowerCase,
tag = Some(version.value))
),

buildOptions in docker := BuildOptions(cache = false)
)



It produce docker file as follows

FROM lightbend/flink:1.7.1-scala_2.11
ADD 0/fdp-flink-taxiride-assembly-2.0.0.jar /opt/flink/lib/fdp-flink-taxiride-assembly-2.0.0.jar




Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 22, 2019, at 9:13 AM, Dawid Wysakowicz <[hidden email]> wrote:

Hi,

One additional question, how do you actually build the docker image? How do you put the user jar into the container? Maybe you added extra kafka connector to the cluster classpath? Have you checked what is on the classpath of a running taskmanager?

Best,

Dawid

On 22/02/2019 15:44, Boris Lublinsky wrote:
Gordon, I double checked it several times. Here is the list

Columns are
[info]  - Jar-Size including dependencies
[info]  - Jar-Size
[info]  - Number of transitive dependencies
[info]  - Number of direct dependencies
[info]  - ModuleID
[info] Done updating.
[info]    TotSize    JarSize #TDe #Dep Module
[info]  27.988 MB ------- MB   33    2 lightbend:fdp-flink-taxiride_2.11:2.0.0
[info]  19.632 MB ------- MB   24    6 lightbend:support_2.11:2.0.0
[info]  17.675 MB   0.000 MB   16    9 org.typelevel:cats_2.11:0.9.0
[info]  17.389 MB   1.182 MB   13    9 org.typelevel:cats-laws_2.11:0.9.0
[info]  13.115 MB   0.286 MB    7    4 org.typelevel:cats-free_2.11:0.9.0
[info]  12.830 MB   0.000 MB    7    4 org.typelevel:cats-jvm_2.11:0.9.0
[info]  12.830 MB   3.403 MB    6    4 org.typelevel:cats-core_2.11:0.9.0
[info]   8.397 MB   0.068 MB    8    5 org.apache.flink:flink-connector-kafka_2.11:1.7.1
[info]   8.175 MB   1.889 MB    4    4 org.apache.kafka:kafka-clients:2.1.0
[info]   7.973 MB   0.212 MB    7    4 org.typelevel:cats-kernel-laws_2.11:0.9.0
[info]   4.834 MB   0.007 MB    4    2 org.typelevel:cats-macros_2.11:0.9.0
[info]   4.746 MB   0.082 MB    2    2 com.typesafe.scala-logging:scala-logging_2.11:3.9.0
[info]   4.658 MB   0.035 MB    1    1 org.typelevel:machinist_2.11:0.6.1
[info]   4.623 MB   4.623 MB    0    0 org.scala-lang:scala-reflect:2.11.12
[info]   4.592 MB   4.592 MB    0    0 org.typelevel:cats-kernel_2.11:0.9.0
[info]   3.714 MB   3.714 MB    0    0 com.github.luben:zstd-jni:1.3.5-4
[info]   3.152 MB   0.043 MB    2    1 org.typelevel:discipline_2.11:0.7.2
[info]   3.109 MB   3.094 MB    1    1 org.scalacheck:scalacheck_2.11:1.13.4
[info]   2.019 MB   2.019 MB    0    0 org.xerial.snappy:snappy-java:1.1.7.2
[info]   0.803 MB   0.290 MB    2    2 ch.qos.logback:logback-classic:1.2.3
[info]   0.641 MB   0.641 MB    0    0 joda-time:joda-time:2.10.1
[info]   0.512 MB   0.512 MB    0    0 org.lz4:lz4-java:1.5.0
[info]   0.472 MB   0.472 MB    0    0 ch.qos.logback:logback-core:1.2.3
[info]   0.286 MB   0.286 MB    0    0 com.typesafe:config:1.3.3
[info]   0.195 MB   0.114 MB    3    3 org.apache.flink:flink-connector-kafka-base_2.11:1.7.1
[info]   0.170 MB   0.167 MB    1    1 com.github.mpilquist:simulacrum_2.11:0.10.0
[info]   0.145 MB   0.145 MB    0    0 org.joda:joda-convert:2.1.2
[info]   0.041 MB   0.041 MB    0    0 org.slf4j:slf4j-api:1.7.25
[info]   0.033 MB   0.033 MB    0    0 com.google.code.findbugs:jsr305:1.3.9
[info]   0.016 MB   0.002 MB    2    1 org.typelevel:catalysts-platform_2.11:0.0.5
[info]   0.015 MB   0.012 MB    1    1 org.typelevel:catalysts-macros_2.11:0.0.5
[info]   0.015 MB   0.015 MB    0    0 org.scala-sbt:test-interface:1.0
[info]   0.007 MB   0.007 MB    0    0 org.apache.flink:force-shading:1.7.1
[info]   0.003 MB   0.003 MB    0    0 org.typelevel:macro-compat_2.11:1.1.1


And here is another view

[info] lightbend:fdp-flink-taxiride_2.11:2.0.0 [S]
[info]   +-lightbend:support_2.11:2.0.0 [S]
[info]   | +-ch.qos.logback:logback-classic:1.2.3
[info]   | | +-ch.qos.logback:logback-core:1.2.3
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-com.typesafe.scala-logging:scala-logging_2.11:3.9.0 [S]
[info]   | | +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   | | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]   | | +-org.slf4j:slf4j-api:1.7.25
[info]   | | 
[info]   | +-com.typesafe:config:1.3.3
[info]   | +-joda-time:joda-time:2.10.1
[info]   | +-org.joda:joda-convert:2.1.2
[info]   | +-org.typelevel:cats_2.11:0.9.0 [S]
[info]   |   +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | 
[info]   |   +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-free_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-jvm_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]   |   | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | 
[info]   |   | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   |     +-org.scala-sbt:test-interface:1.0
[info]   |   |     
[info]   |   +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   +-org.typelevel:cats-laws_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | 
[info]   |   | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | | 
[info]   |   | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]   |   | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | | | +-org.scala-sbt:test-interface:1.0
[info]   |   | | | 
[info]   |   | | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]   |   | | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]   |   | | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | |   
[info]   |   | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   | |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | |     +-org.scala-sbt:test-interface:1.0
[info]   |   | |     
[info]   |   | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]   |   | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | | 
[info]   |   | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   | |   
[info]   |   | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]   |   | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]   |   | |   +-org.scala-sbt:test-interface:1.0
[info]   |   | |   
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]   |   | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]   |   | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]   |   | | 
[info]   |   | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |   |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |   |   
[info]   |   +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]   |     +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   |     
[info]   +-org.apache.flink:flink-connector-kafka_2.11:1.7.1
[info]     +-com.google.code.findbugs:jsr305:1.3.9
[info]     +-org.apache.flink:flink-connector-kafka-base_2.11:1.7.1
[info]     | +-com.google.code.findbugs:jsr305:1.3.9
[info]     | +-org.apache.flink:force-shading:1.7.1
[info]     | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     | +-org.slf4j:slf4j-api:1.7.25
[info]     | 
[info]     +-org.apache.flink:force-shading:1.7.1
[info]     +-org.apache.kafka:kafka-clients:2.1.0
[info]     | +-com.github.luben:zstd-jni:1.3.5-4
[info]     | +-org.lz4:lz4-java:1.5.0
[info]     | +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     | +-org.slf4j:slf4j-api:1.7.25
[info]     | +-org.xerial.snappy:snappy-java:1.1.7.2
[info]     | 
[info]     +-org.slf4j:slf4j-api:1.7.15 (evicted by: 1.7.25)
[info]     +-org.slf4j:slf4j-api:1.7.25
[info]     
[info] Done updating.
[info] lightbend:support_2.11:2.0.0 [S]
[info]   +-ch.qos.logback:logback-classic:1.2.3
[info]   | +-ch.qos.logback:logback-core:1.2.3
[info]   | +-org.slf4j:slf4j-api:1.7.25
[info]   | 
[info]   +-com.typesafe.scala-logging:scala-logging_2.11:3.9.0 [S]
[info]   | +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]   | +-org.slf4j:slf4j-api:1.7.25
[info]   | 
[info]   +-com.typesafe:config:1.3.3
[info]   +-joda-time:joda-time:2.10.1
[info]   +-org.joda:joda-convert:2.1.2
[info]   +-org.typelevel:cats_2.11:0.9.0 [S]
[info]     +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | 
[info]     +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-free_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-jvm_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]     | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | +-org.scala-sbt:test-interface:1.0
[info]     | | 
[info]     | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     |     +-org.scala-sbt:test-interface:1.0
[info]     |     
[info]     +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     +-org.typelevel:cats-laws_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | +-org.scala-sbt:test-interface:1.0
[info]     | | 
[info]     | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-core_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | | 
[info]     | | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | | |   
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:cats-kernel-laws_2.11:0.9.0 [S]
[info]     | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | | | +-org.scala-sbt:test-interface:1.0
[info]     | | | 
[info]     | | +-org.typelevel:catalysts-platform_2.11:0.0.5 [S]
[info]     | | | +-org.typelevel:catalysts-macros_2.11:0.0.5 [S]
[info]     | | |   +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | |   
[info]     | | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     | |   +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | |     +-org.scala-sbt:test-interface:1.0
[info]     | |     
[info]     | +-org.typelevel:cats-kernel_2.11:0.9.0 [S]
[info]     | +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | | 
[info]     | | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     | |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     | |   
[info]     | +-org.typelevel:discipline_2.11:0.7.2 [S]
[info]     | | +-org.scalacheck:scalacheck_2.11:1.13.4 [S]
[info]     | |   +-org.scala-sbt:test-interface:1.0
[info]     | |   
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:cats-macros_2.11:0.9.0 [S]
[info]     | +-com.github.mpilquist:simulacrum_2.11:0.10.0 [S]
[info]     | | +-org.typelevel:macro-compat_2.11:1.1.1 [S]
[info]     | | 
[info]     | +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]     |   +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]     |   
[info]     +-org.typelevel:machinist_2.11:0.6.1 [S]
[info]       +-org.scala-lang:scala-reflect:2.11.12 [S]
[info]       





Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 22, 2019, at 12:33 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

I haven't seen this problem for Flink 1.6.x / 1.7.x, so it shouldn't be a reoccurrence of FLINK-8741.
I've double checked the used classloaders in the Kafka connector, they seem to be correct.

The fact that it works correctly in IntelliJ, but not when packaged, suggests that there could be some conflicting dependencies in the packaged jar.

Could you check the actual resolved dependency tree of the project, and see if there are multiple versions of some dependency related to Kafka being pulled in? For Maven for example, that would be "mvn dependency:tree".

Cheers,
Gordon

On Thu, Feb 21, 2019 at 10:49 PM Boris Lublinsky <[hidden email]> wrote:
The relevant dependencies are 
val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % flinkVersion % "provided"
val flinkStreamingScala   =      "org.apache.flink"             %%   "flink-streaming-scala"          % flinkVersion % "provided"
val flinkKafka            =      "org.apache.flink"             %%   "flink-connector-kafka"          % flinkVersion exclude("org.slf4j", "slf4j-log4j12") 
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result 


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try without fixing the Kafka version, i.e. running with the Kafka client version provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <[hidden email]> wrote:
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
#   from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
#   and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
    if [ $(id -u) != 0 ]; then
        # Don't need to drop privs if EUID != 0
        return
    elif [ -x /sbin/su-exec ]; then
        # Alpine
        echo su-exec flink
    else
        # Others
        echo gosu flink
    fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
    echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
    exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
    if [ "${CMD}" = "${TASK_MANAGER}" ]; then
        TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

        sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
        sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
        echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
        echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

        echo "Starting Task Manager"
        echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
        exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
    else
        sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
        echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
        echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
        echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

        if [ -z "$1" ]; then
           exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
        else
            exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
        fi
    fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



Reply | Threaded
Open this post in threaded view
|

Re: Jira issue Flink-11127

Andrey Zagrebin-3
In reply to this post by Boris Lublinsky

On Fri, Feb 22, 2019 at 1:28 AM Boris Lublinsky <[hidden email]> wrote:
Adding metric-query port makes it a bit better, but there is still an error


019-02-22 00:03:56,173 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-22 00:04:16,213 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-22 00:04:36,253 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify"..
2019-02-22 00:04:56,293 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type "akka.actor.Identify”..

In the task manager and

2019-02-21 23:59:46,479 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [akka.tcp://flink@127.0.0.1:6123]
2019-02-21 23:59:57,808 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:06,519 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:17,849 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:26,558 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:37,888 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are [akka.tcp://flink@127.0.0.1:6123]

I the job manager

Port 6123 is opened in both Job Manager deployment

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: {{ template "fullname" . }}-jobmanager
spec:
  replicas: 1
  template:
    metadata:
      annotations:
        prometheus.io/scrape: 'true'
        prometheus.io/port: '9249'
      labels:
        server: flink
        app: {{ template "fullname" . }}
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: {{ .Values.image }}:{{ .Values.imageTag }}
        imagePullPolicy: {{ .Values.imagePullPolicy }}
        args:
        - jobmanager
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        env:
        - name: CONTAINER_METRIC_PORT
          value: '{{ .Values.flink.metric_query_port }}'
        - name: JOB_MANAGER_RPC_ADDRESS
          value : {{ template "fullname" . }}-jobmanager
        livenessProbe:
          httpGet:
            path: /overview
            port: 8081
          initialDelaySeconds: 30
          periodSeconds: 10
        resources:
          limits:
            cpu: {{ .Values.resources.jobmanager.limits.cpu }}
            memory: {{ .Values.resources.jobmanager.limits.memory }}
          requests:
            cpu: {{ .Values.resources.jobmanager.requests.cpu }}
            memory: {{ .Values.resources.jobmanager.requests.memory }}

And Job manager service

apiVersion: v1
kind: Service
metadata:
  name: {{ template "fullname" . }}-jobmanager
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: {{ template "fullname" . }}
    component: jobmanager




Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 6:13 PM, Boris Lublinsky <[hidden email]> wrote:


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the exact command depends on the docker-entrypoint.sh script and the image you are using. For the example contained in the Flink repository it is "task-manager", I think. The important thing is to pass "taskmanager.host" to the Taskmanager process. You can verify by checking the Taskmanager logs. These should contain lines like below:

2019-02-21 08:03:00,004 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -  Program Arguments:
2019-02-21 08:03:00,008 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] -     -Dtaskmanager.host=10.12.10.173

In the Jobmanager logs you should see that the Taskmanager is registered under the IP above in a line similar to:

2019-02-21 08:03:26,874 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0) at ResourceManager

A service per Taskmanager is not required. The purpose of the config parameter is that the Jobmanager addresses the taskmanagers by IP instead of hostname.

Hope this helps!

Cheers,

Konstantin



On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <[hidden email]> wrote:
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: [flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler - Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:      flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:    10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

the solution is actually simpler than it sounds from the ticket. The only thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink configuration. The easiest way to do this is to pass this config dynamically via a command-line parameter. 

The Deployment spec could looks something like this:
containers:
- name: taskmanager
[...]
args:
- "taskmanager.sh"
- "start-foreground"
- "-Dtaskmanager.host=$(K8S_POD_IP)"
[...]
  env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

Hope this helps and let me know if this works. 

Best, 

Konstantin

On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <[hidden email]> wrote:
Apparently there is a workaround for it.
Is it possible provide the complete helm chart for it.
Bits and pieces are in the ticket, but it would be nice to see the full chart

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/



--
Konstantin Knauf | Solutions Architect
+49 160 91394525


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


12