Hello all,
I am working on a custom sink implementation, but having weird issues with checkpointing. I am using a custom ListState to checkpoint, and it looks like this: private var checkpointMessages: ListState[Bucket] =_ My snapshot function looks like: @throws[IOException]where class bucket is: @SerialVersionUID(1L) BufferredMessages signature is private val bufferedMessages = collection.mutable.Map[String, Bucket]() The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3. I try to run this both locally in intellij and on a cluster: On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below: I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a. On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink. Could someone please help me/ guide me to troubleshoot this further. Thanking in advance, Vipul |
Hi,
the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your jar gets overwritten while running, e.g. from your IDE? The serialization exception looks like the custom sink is using the same serializer in different threads concurrently. I don’t have the full custom code but this would be my guess. Ensure to duplicate serializers whenever different threads could work on them, e.g. processing vs checkpointing. Best, Stefan
|
Thanks Stefan for the answers. The serialization is happening during the creation of snapshot state. I have added a gist with a larger stacktrace(https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am not using any serializer, in the custom sink.
We have src.keyBy(m => (m.topic, m.partition)) So there should be a 1-1 source and sink mapping, i am assuming. If possible could you could please give some more pointers to help troubleshoot Thanks, Vipul On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <[hidden email]> wrote:
Thanks, Vipul |
Thanks Stefan. I found the issue in my application. Everything is working as excepted now. Once again thanks for the help and advice. On Fri, Oct 20, 2017 at 4:51 AM, vipul singh <[hidden email]> wrote:
Thanks, Vipul |
Free forum by Nabble | Edit this page |