Hi Igal,
thanks for these pointers!
I currently deploy a flink jar per docker copy. But this is a
spike setup anyway. I will now discard it and switch directly to
working in kubernetes.
So, just so I understand this right, the recommended production
setup would be:
* Build a docker image containing the job and custom
flink-conf.yaml based on this: https://github.com/apache/flink-statefun/tree/master/tools/docker
* Deploy the job image to kubernetes per helm into a standalone
StateFun Jobcluster: https://github.com/apache/flink-statefun/tree/master/tools/k8s
* Repeat the above steps for each StateFun Job separately
Two questions left:
1) Does the recommendation of one Cluster per Job in kubernetes
setups also hold for "regular" Flink Jobs?
2) Do you (ververica) offer developer training with special
focus on Stateful Functions? I would probably feel a bit safer
moving into production with that as a background... :-)
Thanks again for your quick and comprehensive replies!
Best regards and a nice evening!
Jan
On 05.11.20 17:55, Igal Shilman
wrote:
How do you deploy the job currently?
Are you using the data stream integration / or as a
Flink Jar [1]
(also please note, that the directories might be
created but without checkpoint interval set, they will be
empty)
Regarding your two questions:
That is true that you can theoretically share the same
cluster to submit additional jobs besides StateFun.
statefun requires a specific set of configurations,
that might not apply for your other jobs.
Considering your end-goal of eventually using
kubernetes, the recommended way is actually using a
cluster per job, and StateFun docker images
are a convenient way to package your modules.
Hi Igal,
thanks for your quick and detailed reply! For me,
this is the really great defining feature of Stateful
Functions: Separating StreamProcessing
"Infrastructure" from Business Logic Code, possibly
maintained by a different team.
Regarding your points: I did add the checkpoint
interval to the flink-conf to to avail.
state.checkpoint.dir was already set and all the
necessary subfolders get created on job startup. They
just stay empty...
Thanks for the pointer to the helm charts! Just what
I was looking for!
A question regarding StateFun docker images: I would
actually prefer using them but my fear is that they
would take away the my options to:
1) deploy a new release of my StateFun job without
killing the cluster, because...
2) ... I would like to schedule regular flink jobs or
additional StateFun jobs on the same cluster alongside
my original job.
Could you give a quick opinion if these fears are
even true and if so, what would be a recommended setup
to satisfy these use cases?
Best regards
Jan
On 05.11.20 17:02, Igal Shilman wrote:
Hi Jan,
The architecture outlined by you, sounds good
and we've run successfully mixed
architectures like this.
Let me try to address your questions:
1)
To enable checkpointing you need to set the
relevant values in your flink-conf.yaml file.
execution.checkpointing.interval:
<duration> (see [1])
state.checkpoint.dir: <path> (see [2])
You can take a look here for an example [3].
The easiest way to incorporate the changes would
be to add your custom flink-conf.yaml into your
docker image (here is an example [4]).
When you will be using kubernetes, you can
mount a config map as a flink-conf.yaml, check out
the helm charts here: [5]
2)
When the remote function is unavailable,
StateFun would buffer the messages addressed to
it, upto the specified
timeout (default would be 1 minute, you can set
it here [6]) before the job is considered to be
failed and it would be restarted.
It seems like in your example you are waiting
for 10 seconds, so the messages should be
delivered.
Do you set function.spec.timeout or
.withMaxRequestDuration() to something else?
Good luck!
Igal.
p.s,
Consider using StateFun docker images[7], see
any of the examples in the statefun repository.
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir
Hi,
I'm currently trying to set up a Flink
Stateful Functions Job with the
following architecture:
* Kinesis Ingress (embedded)
* Stateful Function (embedded) that calls to
and takes responses from an
external business logic function (python
worker similar to the one in
the python greeter example)
* Kinesis Egress (embedded)
For the time being I am working with a local
docker-compose cluster, but
the goal would be to move this to kubernetes
for production. The stream
processing itself is working fine, but I can't
solve two problems with
respect to Fault Tolerance:
1) The app is not writing checkpoints or
savepoints at all (rocksDB,
local filesystem). A checkpoint dir is created
on startup but stays
empty the whole time. When stopping the job, a
savepoint dir is created
but the stop ultimately fails with a
java.util.concurrent.TimeoutException and the
job continues to run.
2) When I try and simulate failure in the
external Function
("docker-compose stop python-worker &&
sleep 10 && docker-compose start
python-worker"), I lose all messages in
between restarts. Although, the
documentation states that "For both state and
messaging, Stateful
Functions is able to provide the exactly-once
guarantees users expect
from a modern data processing framework".
See the relevant parts of my configs below.
Any input or help would be greatly
appreciated.
Best regards
Jan
------
flink-conf.yaml
-------
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory:
ROCKSDB
state.checkpoints.dir: file:///checkpoint-dir
state.savepoints.dir: file:///checkpoint-dir
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
--------
docker-compose.yaml
-------
jobmanager:
image: flink:1.11.2-scala_2.12-java8
expose:
- "6123"
ports:
- "8082:8081"
volumes:
-
./streamProcessor/checkpoint-dir:/checkpoint-dir
-
./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
-
"FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
taskmanager:
image: flink:1.11.2-scala_2.12-java8
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
-
"FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501