Re: Starting Flink cluster and running a job

Posted by Konstantin Knauf-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Jira-issue-Flink-11127-tp26180p26224.html

Hi Boris,

without looking at the entrypoint in much detail, generally there should not be a race condition there:

* if the taskmanagers can not connect to the resourcemanager they will retry (per default the timeout is 5 mins)
* if the JobManager does not get enough resources from the ResourceManager it will also wait for the resources/slots to provided. The timeout there is also 5 minutes, I think.

So, this should actually be pretty robust as long as the Taskmanager containers can reach the Jobmanager eventually.

Could you provide the Taskmanager/JobManager logs for such a failure case?

Cheers,

Konstantin


On Mon, Feb 18, 2019 at 1:07 AM Boris Lublinsky <[hidden email]> wrote:
Following https://github.com/apache/flink/tree/release-1.7/flink-container/docker
I have created an entry point, which looks like follows:
#!/bin/sh

################################################################################
# from https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
# and https://github.com/docker-flink/docker-flink/blob/63b19a904fa8bfd1322f1d59fdb226c82b9186c7/1.7/scala_2.11-alpine/docker-entrypoint.sh
################################################################################

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

JOB_MANAGER="jobmanager"
TASK_MANAGER="taskmanager"

CMD="$1"
shift

if [ "${CMD}" = "help" ]; then
echo "Usage: $(basename $0) (${JOB_MANAGER}|${TASK_MANAGER}|help)"
exit 0
elif [ "${CMD}" = "${JOB_MANAGER}" -o "${CMD}" = "${TASK_MANAGER}" ]; then
if [ "${CMD}" = "${TASK_MANAGER}" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}

sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"

echo "Starting Task Manager"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground
else
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_HOME/conf/flink-conf.yaml"
echo "blob.server.port: 6124" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "query.server.port: 6125" >> "$FLINK_HOME/conf/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_HOME/conf/flink-conf.yaml"

if [ -z "$1" ]; then
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
fi
fi

exec "$@"
It does work for all the cases, except running standalone job.
The problem, the way I understand it, is a racing condition.
In kubernetes it takes several attempts for establish connection between Job and Task manager, while standalone-job.sh
 tries to start a job immediately once the cluster is created (before connection is established).
Is there a better option to implement it starting a job on container startup?
 


--

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen