Issues in recovering state from last crash using custom sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Issues in recovering state from last crash using custom sink

vipul singh
Hi all,

I am working on a flink archiver application. In a gist this application tries to reads a bunch of schematized messages from kafka and archives them to s3. Due to the nature of the naming of the files, I had to go towards a custom sink implementation. As of the current progress the application in general is able to archive files to s3 ok.
I am having some issues during the recovery phase. A sample of the code can be found on link. My issue is on recovery when initializeState is called, it is not able to get(recover) the last checkpointed ListState( i.e. checkpointedMessages is 0). I think this might be because of the way I am retrieving the checkpointed messages. Could someone please point me to what is wrong? or direct me to some examples which do a similar thing( Please note Message class is our own implementation)

Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Issues in recovering state from last crash using custom sink

Aljoscha Krettek
Hi,

How are you testing the recovery behaviour? Are you taking a savepoint ,then shutting down, and then restarting the Job from the savepoint?

Best,
Aljoscha

On 28. Aug 2017, at 00:28, vipul singh <[hidden email]> wrote:

Hi all,

I am working on a flink archiver application. In a gist this application tries to reads a bunch of schematized messages from kafka and archives them to s3. Due to the nature of the naming of the files, I had to go towards a custom sink implementation. As of the current progress the application in general is able to archive files to s3 ok.
I am having some issues during the recovery phase. A sample of the code can be found on link. My issue is on recovery when initializeState is called, it is not able to get(recover) the last checkpointed ListState( i.e. checkpointedMessages is 0). I think this might be because of the way I am retrieving the checkpointed messages. Could someone please point me to what is wrong? or direct me to some examples which do a similar thing( Please note Message class is our own implementation)

Thanks,
Vipul

Reply | Threaded
Open this post in threaded view
|

Re: Issues in recovering state from last crash using custom sink

vipul singh
Hi Aljoscha,

Yes.
I am running the application till a few checkpoints are complete. I am stopping the application between two checkpoints, so there will be messages in the list state, which should be checkpointed when snapshot is called. I am able to see a checkpoint file on S3( I am saving the checkpoints on s3 using rockstatedb). On restarting the application, I add a debug point here, to see if there are any messages in checkpointedMessages, but as shown below, the list is empty.


​ 
Do you think there might be an error in the way I am trying to retrieve messages?


def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}

def initializeState(context: FunctionInitializationContext) {
// Check is files alreay exist in /tmp
// this might be the case the program crashed before these files were uploaded to s3
// We need to recover(upload these files to S3 and clear the directory
handlePreviousPendingFiles()
checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {})))
import scala.collection.JavaConversions._
for (message <- checkpointedMessages.get) {
bufferredMessages.add(message)
}
} From my understanding in the above code, upon checkpointing, messages contained in checkpointedMessages are in the snapshot, and on initializeState being called, it will try to recover these messages from last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {})))

Please let me know!

Thanks,
Vipul

On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

How are you testing the recovery behaviour? Are you taking a savepoint ,then shutting down, and then restarting the Job from the savepoint?

Best,
Aljoscha

On 28. Aug 2017, at 00:28, vipul singh <[hidden email]> wrote:

Hi all,

I am working on a flink archiver application. In a gist this application tries to reads a bunch of schematized messages from kafka and archives them to s3. Due to the nature of the naming of the files, I had to go towards a custom sink implementation. As of the current progress the application in general is able to archive files to s3 ok.
I am having some issues during the recovery phase. A sample of the code can be found on link. My issue is on recovery when initializeState is called, it is not able to get(recover) the last checkpointed ListState( i.e. checkpointedMessages is 0). I think this might be because of the way I am retrieving the checkpointed messages. Could someone please point me to what is wrong? or direct me to some examples which do a similar thing( Please note Message class is our own implementation)

Thanks,
Vipul




--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Issues in recovering state from last crash using custom sink

Aljoscha Krettek
Hi,

If you are not manually doing a savepoint and then restoring from that savepoint you will not restore state. Simply stopping a job and then restarting will not restore state. The regular checkpoints are only used for recovery if a job fails, not for a user-induced shutdown.

Best,
Aljoscha
On 28. Aug 2017, at 20:14, vipul singh <[hidden email]> wrote:

Hi Aljoscha,

Yes.
I am running the application till a few checkpoints are complete. I am stopping the application between two checkpoints, so there will be messages in the list state, which should be checkpointed when snapshot is called. I am able to see a checkpoint file on S3( I am saving the checkpoints on s3 using rockstatedb). On restarting the application, I add a debug point here, to see if there are any messages in checkpointedMessages, but as shown below, the list is empty.

<Screen Shot 2017-08-28 at 10.30.10 AM.png>
​ 
Do you think there might be an error in the way I am trying to retrieve messages?


def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}

def initializeState(context: FunctionInitializationContext) {
// Check is files alreay exist in /tmp
// this might be the case the program crashed before these files were uploaded to s3
// We need to recover(upload these files to S3 and clear the directory
handlePreviousPendingFiles()
checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {})))
import scala.collection.JavaConversions._
for (message <- checkpointedMessages.get) {
bufferredMessages.add(message)
}
} From my understanding in the above code, upon checkpointing, messages contained in checkpointedMessages are in the snapshot, and on initializeState being called, it will try to recover these messages from last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {})))

Please let me know!

Thanks,
Vipul

On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

How are you testing the recovery behaviour? Are you taking a savepoint ,then shutting down, and then restarting the Job from the savepoint?

Best,
Aljoscha

On 28. Aug 2017, at 00:28, vipul singh <[hidden email]> wrote:

Hi all,

I am working on a flink archiver application. In a gist this application tries to reads a bunch of schematized messages from kafka and archives them to s3. Due to the nature of the naming of the files, I had to go towards a custom sink implementation. As of the current progress the application in general is able to archive files to s3 ok.
I am having some issues during the recovery phase. A sample of the code can be found on link. My issue is on recovery when initializeState is called, it is not able to get(recover) the last checkpointed ListState( i.e. checkpointedMessages is 0). I think this might be because of the way I am retrieving the checkpointed messages. Could someone please point me to what is wrong? or direct me to some examples which do a similar thing( Please note Message class is our own implementation)

Thanks,
Vipul




--
Thanks,
Vipul