Custom Sink Checkpointing errors

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Sink Checkpointing errors

vipul singh
Hello all,

I am working on a custom sink implementation, but having weird issues with checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:
private var checkpointMessages: ListState[Bucket] =_

My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointMessages.clear()
for((bucketName, bucket) <- bufferedMessages) {

// cloning to avoid any conncurrent modification issues
var new_buffer = new ListBuffer[GenericRecord]()

bucket.buffer.foreach(f=> new_buffer += f)

val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

if(shouldUpload(bucketName)) uploadFile (bucketName)
else checkpointMessages.add(new_bucket)
}}
where class bucket is:
@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
def this(name: String) = {
this(name, ListBuffer[GenericRecord](), new Date().getTime)
}
}

BufferredMessages signature is 
private val bufferedMessages = collection.mutable.Map[String, Bucket]()

The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below:

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport: 'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a

On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

--
Thanking in advance,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Custom Sink Checkpointing errors

Stefan Richter
Hi,

the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your jar gets overwritten while running, e.g. from your IDE?

The serialization exception looks like the custom sink is using the same serializer in different threads concurrently. I don’t have the full custom code but this would be my guess. Ensure to duplicate serializers whenever different threads could work on them, e.g. processing vs checkpointing.

Best,
Stefan


 
Am 20.10.2017 um 14:24 schrieb vipul singh <[hidden email]>:

Hello all,

I am working on a custom sink implementation, but having weird issues with checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:
private var checkpointMessages: ListState[Bucket] =_

My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointMessages.clear()
for((bucketName, bucket) <- bufferedMessages) {

// cloning to avoid any conncurrent modification issues
var new_buffer = new ListBuffer[GenericRecord]()

bucket.buffer.foreach(f=> new_buffer += f)

val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

if(shouldUpload(bucketName)) uploadFile (bucketName)
else checkpointMessages.add(new_bucket)
}}
where class bucket is:
@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
def this(name: String) = {
this(name, ListBuffer[GenericRecord](), new Date().getTime)
}
}

BufferredMessages signature is 
private val bufferedMessages = collection.mutable.Map[String, Bucket]()

The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below:

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport: 'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a

On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

--
Thanking in advance,
Vipul

Reply | Threaded
Open this post in threaded view
|

Re: Custom Sink Checkpointing errors

vipul singh
Thanks Stefan for the answers. The serialization is happening during the creation of snapshot state. I have added a gist with a larger stacktrace(https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am not using any serializer, in the custom sink.

We have 
src.keyBy(m => (m.topic, m.partition))
.map(message => updateMessage(message, config))
.addSink(new CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
.name(FLINK_JOB_ID)
So there should be a 1-1 source and sink mapping, i am assuming.

If possible could you could please give some more pointers to help troubleshoot

Thanks,
Vipul


On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <[hidden email]> wrote:
Hi,

the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your jar gets overwritten while running, e.g. from your IDE?

The serialization exception looks like the custom sink is using the same serializer in different threads concurrently. I don’t have the full custom code but this would be my guess. Ensure to duplicate serializers whenever different threads could work on them, e.g. processing vs checkpointing.

Best,
Stefan


 
Am 20.10.2017 um 14:24 schrieb vipul singh <[hidden email]>:

Hello all,

I am working on a custom sink implementation, but having weird issues with checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:
private var checkpointMessages: ListState[Bucket] =_

My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointMessages.clear()
for((bucketName, bucket) <- bufferedMessages) {

// cloning to avoid any conncurrent modification issues
var new_buffer = new ListBuffer[GenericRecord]()

bucket.buffer.foreach(f=> new_buffer += f)

val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

if(shouldUpload(bucketName)) uploadFile (bucketName)
else checkpointMessages.add(new_bucket)
}}
where class bucket is:
@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
def this(name: String) = {
this(name, ListBuffer[GenericRecord](), new Date().getTime)
}
}

BufferredMessages signature is 
private val bufferedMessages = collection.mutable.Map[String, Bucket]()

The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below:

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport: 'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a

On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

--
Thanking in advance,
Vipul




--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Custom Sink Checkpointing errors

vipul singh
Thanks Stefan. I found the issue in my application. Everything is working as excepted now. 
Once again thanks for the help and advice.

On Fri, Oct 20, 2017 at 4:51 AM, vipul singh <[hidden email]> wrote:
Thanks Stefan for the answers. The serialization is happening during the creation of snapshot state. I have added a gist with a larger stacktrace(https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am not using any serializer, in the custom sink.

We have 
src.keyBy(m => (m.topic, m.partition))
.map(message => updateMessage(message, config))
.addSink(new CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
.name(FLINK_JOB_ID)
So there should be a 1-1 source and sink mapping, i am assuming.

If possible could you could please give some more pointers to help troubleshoot

Thanks,
Vipul


On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <[hidden email]> wrote:
Hi,

the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your jar gets overwritten while running, e.g. from your IDE?

The serialization exception looks like the custom sink is using the same serializer in different threads concurrently. I don’t have the full custom code but this would be my guess. Ensure to duplicate serializers whenever different threads could work on them, e.g. processing vs checkpointing.

Best,
Stefan


 
Am 20.10.2017 um 14:24 schrieb vipul singh <[hidden email]>:

Hello all,

I am working on a custom sink implementation, but having weird issues with checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:
private var checkpointMessages: ListState[Bucket] =_

My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointMessages.clear()
for((bucketName, bucket) <- bufferedMessages) {

// cloning to avoid any conncurrent modification issues
var new_buffer = new ListBuffer[GenericRecord]()

bucket.buffer.foreach(f=> new_buffer += f)

val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

if(shouldUpload(bucketName)) uploadFile (bucketName)
else checkpointMessages.add(new_bucket)
}}
where class bucket is:
@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
def this(name: String) = {
this(name, ListBuffer[GenericRecord](), new Date().getTime)
}
}

BufferredMessages signature is 
private val bufferedMessages = collection.mutable.Map[String, Bucket]()

The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below:

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport: 'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a

On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

--
Thanking in advance,
Vipul




--
Thanks,
Vipul



--
Thanks,
Vipul