Trigerring Savepoint for the Flink Job

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Trigerring Savepoint for the Flink Job

Anil
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Rong Rong
Hi Anil,

I am actually one of the engineers maintaining Uber's AthenaX open-source platform. As for now, AthenaX is still running on Flink 1.3.2 so it might be weird to follow the 1.4.2 release document. (We are working on upgrading to latest 1.5 release)

For your question regarding savepoints, AthenaX does not support savepoint natively at this moment. But we have a separated API WatchdogPolicy that you can customized. It supports monitoring / management APIs which will be called periodically. You can utilize that to trigger your daily savepoints. In term of how to do savepoint, I think the REST API [1] might be a good starting point in AthenaX case because we launch the job in detach mode.

Please let me know if this is helpful.

Thanks,
Rong


On Thu, May 31, 2018 at 1:01 PM, Anil <[hidden email]> wrote:
I am using Flink 1.4.2. I have forker Uber's AthenaX  project
<https://github.com/uber/AthenaX>  .

The Flink jobs are deployed in Yarn cluster. I needed to save the Savepoint
for all the jobs everyday.

ClusterClient
<https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L672
gave an implementation for saving savepoint using Flink ID.
YarnClusterClient
<https://github.com/apache/flink/blob/release-1.4.2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
is an implementation of ClusteClient.

Initial though was to use YarnClusterClient instance with Flink Id (I save
this when the Flink Job is deployed to Yarn cluster) to trigger savepoint.
So I created an instance of YarnClusterClient once and saved it so that I
could use it anytime in the application. But this doesn't seems to work. It
doesn't seems that it can cancel or trigger savepoint even with valid Flink
ID. When I try to cancel a valid Flink Job it throws and error for invalid
id.

I would appreciate if someone could help me out here.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Anil
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Rong Rong
Hi Anil,

Glad to know that you upgrade the system to 1.4, from our experience there are quite a bit of changes requires to adapt to the new deployment model in 1.4 if I remember correctly.
The Deployment model "run detach" in AthenaX does not support reattach back to the job, we use REST API to do all the subsequent life-cycle management.

There are a couple of ways I can think of to workaround if upgrade to 1.5 is not an option:
- try to use CLI API [1] instead of REST API by replacing the life-cycle management component in WatchdogPolicy, so that you can trigger savepoints.
- try to modify the deployment model of AthenaX to not use "run detach" mode by modifying the "YarnClusterDescriptor"


Hope this can help your use case.

Thanks,
Rong

On Thu, May 31, 2018 at 8:38 PM, Anil <[hidden email]> wrote:
Thanks for the reply Rong. We had updated Athenax to version 1.4.

I had checked Flink 1.4, it's rest endpoint dose not support only creating
Savepoint. It has cancel With Savepoint. I think creating Savepoint is
supported in 1.5. Since we can't upgrade to 1.5 at the moment it would like
to find a workaround for the moment.

Can you tell me how to reattaches to a running job in the cluster.

Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Anil
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Anil
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Rong Rong
Glad to know you found a solution. would you mind sharing you workaround in Flink 1.4.2. I am pretty sure other users would benefit from your effort too :-)

Regarding the checkpoints / savepoints backend, we use HDFS as our stateful backend instead of RocksDB. We are working on putting that logic to AthenaX open-source soon.
We will definitely consider integration with RocksDB as well in our next AthenaX release and would love to have your inputs considered towards the final design.

Thanks,
Rong

On Mon, Jun 4, 2018 at 11:12 AM, Anil <[hidden email]> wrote:
Just out of curiosity how do you save your checkpoint. Currently I'm using
filesystem but I'm migrating it to RocksDB which allows for async operations
to avoid latency at higher scale as we grow. 

Reply | Threaded
Open this post in threaded view
|

Re: Trigerring Savepoint for the Flink Job

Anil
CONTENTS DELETED
The author has deleted this message.