DynamoDB as Sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DynamoDB as Sink

Vivek

Hello,

I am currently using Dynamodb as a persistent store in my application, and in process of transforming the application into streaming pipeline utilizing Flink. Pipeline is simple and stateless, consume data from Kafka, apply some transformation and write records to Dynamodb.
I would want to collect records in buffer and batch write them instead of writing one at a time. Also want to put retry and exponential back off for unprocessed records in batch.

My first question , is Dynamodb a suitable Sink? I googled and searched on mailing list archive, but couldn’t find anyone having that requirement.

What are some of the guidelines around implementing a fault tolerant Sink with at least once guarantees? I am looking at ElasticSearch implementation and it does have the capabilities that I want, Could I just try implementing Dynamodb Sink similar way?

Basically, implement Checkpointed interface,
Collect records in buffer in invoke method, flush buffer and checkEerrors in snapshot method,
Flush method will call Dynamodb’s batchwriteitem and if there are any unprocessed records add them back in buffer. FailureHandler will attempt to flush buffer again or do nothing or fail the sink based on failure handling policy.

Please let me know if am in right direct

Thank You
Reply | Threaded
Open this post in threaded view
|

Re: DynamoDB as Sink

Addison Higham
Hi there,

We have implemented a dynamo sink, have had no real issues, but obviously, it is at-least-once and so we work around that by just structuring our transformations so that they produce idempotent writes.

What we do is pretty similar to what you suggest, we collect the records in a buffer (which needs to be checkpointed)
However, instead of using the checkpointedFunction interface to trigger batch writes, we use a RichSinkFunction and have a process time callback. For each record, we ensure there is a process time callback set for some configurable time in the future and add it to the buffer.  Dynamodb has a max batch size of 25 records, so if the batch is full, we immediately trigger a write.

The process time callback serves mostly for situations with low  message volume so that we can control the max amount of latency a record will have.

We initially looked at using the Checkpointed function, but due to dynamos relatively small batch write size we knew that we would need to be flushing really often and that batches would never get very big so state would be minimal.

Hopefully that helps!

On Fri, Mar 22, 2019 at 9:49 AM Vivek <[hidden email]> wrote:

Hello,

I am currently using Dynamodb as a persistent store in my application, and in process of transforming the application into streaming pipeline utilizing Flink. Pipeline is simple and stateless, consume data from Kafka, apply some transformation and write records to Dynamodb.
I would want to collect records in buffer and batch write them instead of writing one at a time. Also want to put retry and exponential back off for unprocessed records in batch.

My first question , is Dynamodb a suitable Sink? I googled and searched on mailing list archive, but couldn’t find anyone having that requirement.

What are some of the guidelines around implementing a fault tolerant Sink with at least once guarantees? I am looking at ElasticSearch implementation and it does have the capabilities that I want, Could I just try implementing Dynamodb Sink similar way?

Basically, implement Checkpointed interface,
Collect records in buffer in invoke method, flush buffer and checkEerrors in snapshot method,
Flush method will call Dynamodb’s batchwriteitem and if there are any unprocessed records add them back in buffer. FailureHandler will attempt to flush buffer again or do nothing or fail the sink based on failure handling policy.

Please let me know if am in right direct

Thank You
Reply | Threaded
Open this post in threaded view
|

Re: DynamoDB as Sink

Addison Higham
Our implementation has quite a bit more going on just to deal with serialization of types, but here is pretty much the core of what we do in (psuedo) scala:

class DynamoSink[...](...) extends RichProcessFunction[T] with ProcessingTimeCallback {

  private var curTimer: Option[Long] = None
  private var toWrite: ListBuffer[T] = new ListBuffer[T]
  def timerService: ProcessingTimeService = getRuntimeContext.asInstanceOf[StreamingRuntimeContext].getProcessingTimeService

  override def invoke(element: T, context: Context[_]): Unit = {
    if (curTimer.isEmpty) {
      val timer = context.currentProcessingTime() + maxDelayTime.toMilliseconds
      timerService.registerTimer(timer, this)
      curTimer = Some(timer)
    }

    toWrite += element
    if (toWrite.length >= 25) {
      flushBufferedRecordsToDynamo()
    }
  }

  override def onProcessingTime(timestamp: Long): Unit = {
    if (toWrite.nonEmpty) {
      flushBufferedRecordsToDynamo()
    }
    curTimer = None
  }

  def flushBufferedRecordsToDynamo(): Unit = {
    val result      = batchWrite(toWrite)
    val unprocessed = getUnprocessedItems(result)
    toWrite.clear()
    toWrite ++= unprocessed
  }
  
   // snapshot and restore the state of the toWrite buffer
  override def snapshotState(context: FunctionSnapshotContext): Unit = ???

  override def initializeState(context: FunctionInitializationContext): Unit = ???

  // write to dynamo
  def batchWrite(toWrite: ListBuffer[T]): List[BatchWriteItemResult] = ???

  // capture any results in the BatchWriteItemResult that didn't get processed
  def getUnprocessedItems(result: List[BatchWriteItemResult]): ListBuffer[T]  = ???
  
}

On Fri, Mar 22, 2019 at 10:59 AM Addison Higham <[hidden email]> wrote:
Hi there,

We have implemented a dynamo sink, have had no real issues, but obviously, it is at-least-once and so we work around that by just structuring our transformations so that they produce idempotent writes.

What we do is pretty similar to what you suggest, we collect the records in a buffer (which needs to be checkpointed)
However, instead of using the checkpointedFunction interface to trigger batch writes, we use a RichSinkFunction and have a process time callback. For each record, we ensure there is a process time callback set for some configurable time in the future and add it to the buffer.  Dynamodb has a max batch size of 25 records, so if the batch is full, we immediately trigger a write.

The process time callback serves mostly for situations with low  message volume so that we can control the max amount of latency a record will have.

We initially looked at using the Checkpointed function, but due to dynamos relatively small batch write size we knew that we would need to be flushing really often and that batches would never get very big so state would be minimal.

Hopefully that helps!

On Fri, Mar 22, 2019 at 9:49 AM Vivek <[hidden email]> wrote:

Hello,

I am currently using Dynamodb as a persistent store in my application, and in process of transforming the application into streaming pipeline utilizing Flink. Pipeline is simple and stateless, consume data from Kafka, apply some transformation and write records to Dynamodb.
I would want to collect records in buffer and batch write them instead of writing one at a time. Also want to put retry and exponential back off for unprocessed records in batch.

My first question , is Dynamodb a suitable Sink? I googled and searched on mailing list archive, but couldn’t find anyone having that requirement.

What are some of the guidelines around implementing a fault tolerant Sink with at least once guarantees? I am looking at ElasticSearch implementation and it does have the capabilities that I want, Could I just try implementing Dynamodb Sink similar way?

Basically, implement Checkpointed interface,
Collect records in buffer in invoke method, flush buffer and checkEerrors in snapshot method,
Flush method will call Dynamodb’s batchwriteitem and if there are any unprocessed records add them back in buffer. FailureHandler will attempt to flush buffer again or do nothing or fail the sink based on failure handling policy.

Please let me know if am in right direct

Thank You
Reply | Threaded
Open this post in threaded view
|

Re: DynamoDB as Sink

Vivek
Thank you Addison, this is very helpful.

On Fri, Mar 22, 2019 at 10:12 AM Addison Higham <[hidden email]> wrote:
Our implementation has quite a bit more going on just to deal with serialization of types, but here is pretty much the core of what we do in (psuedo) scala:

class DynamoSink[...](...) extends RichProcessFunction[T] with ProcessingTimeCallback {

  private var curTimer: Option[Long] = None
  private var toWrite: ListBuffer[T] = new ListBuffer[T]
  def timerService: ProcessingTimeService = getRuntimeContext.asInstanceOf[StreamingRuntimeContext].getProcessingTimeService

  override def invoke(element: T, context: Context[_]): Unit = {
    if (curTimer.isEmpty) {
      val timer = context.currentProcessingTime() + maxDelayTime.toMilliseconds
      timerService.registerTimer(timer, this)
      curTimer = Some(timer)
    }

    toWrite += element
    if (toWrite.length >= 25) {
      flushBufferedRecordsToDynamo()
    }
  }

  override def onProcessingTime(timestamp: Long): Unit = {
    if (toWrite.nonEmpty) {
      flushBufferedRecordsToDynamo()
    }
    curTimer = None
  }

  def flushBufferedRecordsToDynamo(): Unit = {
    val result      = batchWrite(toWrite)
    val unprocessed = getUnprocessedItems(result)
    toWrite.clear()
    toWrite ++= unprocessed
  }
  
   // snapshot and restore the state of the toWrite buffer
  override def snapshotState(context: FunctionSnapshotContext): Unit = ???

  override def initializeState(context: FunctionInitializationContext): Unit = ???

  // write to dynamo
  def batchWrite(toWrite: ListBuffer[T]): List[BatchWriteItemResult] = ???

  // capture any results in the BatchWriteItemResult that didn't get processed
  def getUnprocessedItems(result: List[BatchWriteItemResult]): ListBuffer[T]  = ???
  
}

On Fri, Mar 22, 2019 at 10:59 AM Addison Higham <[hidden email]> wrote:
Hi there,

We have implemented a dynamo sink, have had no real issues, but obviously, it is at-least-once and so we work around that by just structuring our transformations so that they produce idempotent writes.

What we do is pretty similar to what you suggest, we collect the records in a buffer (which needs to be checkpointed)
However, instead of using the checkpointedFunction interface to trigger batch writes, we use a RichSinkFunction and have a process time callback. For each record, we ensure there is a process time callback set for some configurable time in the future and add it to the buffer.  Dynamodb has a max batch size of 25 records, so if the batch is full, we immediately trigger a write.

The process time callback serves mostly for situations with low  message volume so that we can control the max amount of latency a record will have.

We initially looked at using the Checkpointed function, but due to dynamos relatively small batch write size we knew that we would need to be flushing really often and that batches would never get very big so state would be minimal.

Hopefully that helps!

On Fri, Mar 22, 2019 at 9:49 AM Vivek <[hidden email]> wrote:

Hello,

I am currently using Dynamodb as a persistent store in my application, and in process of transforming the application into streaming pipeline utilizing Flink. Pipeline is simple and stateless, consume data from Kafka, apply some transformation and write records to Dynamodb.
I would want to collect records in buffer and batch write them instead of writing one at a time. Also want to put retry and exponential back off for unprocessed records in batch.

My first question , is Dynamodb a suitable Sink? I googled and searched on mailing list archive, but couldn’t find anyone having that requirement.

What are some of the guidelines around implementing a fault tolerant Sink with at least once guarantees? I am looking at ElasticSearch implementation and it does have the capabilities that I want, Could I just try implementing Dynamodb Sink similar way?

Basically, implement Checkpointed interface,
Collect records in buffer in invoke method, flush buffer and checkEerrors in snapshot method,
Flush method will call Dynamodb’s batchwriteitem and if there are any unprocessed records add them back in buffer. FailureHandler will attempt to flush buffer again or do nothing or fail the sink based on failure handling policy.

Please let me know if am in right direct

Thank You