Re: Starting Flink cluster and running a job

Posted by Boris Lublinsky on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Jira-issue-Flink-11127-tp26180p26271.html

The relevant dependencies are 
val flinkScala            =      "org.apache.flink"             %%   "flink-scala"                    % flinkVersion % "provided"
val flinkStreamingScala = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion exclude("org.slf4j", "slf4j-log4j12")
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result 


Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 21, 2019, at 1:38 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try without fixing the Kafka version, i.e. running with the Kafka client version provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <[hidden email]> wrote:
I found some more details on this

It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 7:02 PM, Ken Krugler <[hidden email]> wrote:

Hi Boris,

I haven’t seen this exact error, but I have seen similar errors caused by multiple versions of jars on the classpath.

When I’ve run into this particular "XXX is not an instance of YYY" problem, it often seems to be caused by a jar that I should have marked as provided in my pom.

Though I’m typically running on a YARN cluster, not w/K8s, so maybe this doesn’t apply.

— Ken



On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <[hidden email]> wrote:

Konstantin,
After experimenting with this for a while, I got to the root cause of the problem
I am running a version of a Taxi ride travel prediction as my sample.
It works fine in Intellij,
But when I am trying to put it in the docker (standard Debian 1.7 image)
It fails with a following error


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
at com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:116)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:384)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:375)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:847)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
... 17 more

Which talks about class loader. (I tried there solution, but it did not help)
I looked at the loading and I see that the pair of these 2 classes is loaded from my uber jar, but twice.

Have you guys seen this error before?
Any suggestion?

Boris Lublinsky
FDP Architect
[hidden email]
https://www.lightbend.com/

On Feb 19, 2019, at 4:50 AM, Konstantin Knauf <[hidden email]> wrote:

Hi Boris, 

without looking at the entrypoint in much detail, generally there should not be a race condition there: 

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think. 

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers, 

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


-- 
Konstantin Knauf | Solutions Architect
+49 160 91394525

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen