Approaches for external state for Flink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Approaches for external state for Flink

Oğuzhan Mangır

I'm trying to design a stream flow that checks de-duplicate events and sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state store. The details of my job;

I have an event payload with uuid Field. If the event that has the same uuid will come, this event should be discarded. In my case, two kafka topics are reading. The first topic has a lot of fields, but other topics just have a uuid field, thus I have to enrich data using the same uuid for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra using the uuid. If a state exists, ignore this event and do not emit to the Kafka. If state does not exist, save  this event to the Cassandra, then emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra using the uuid. If state exists, check a column that represents this event came from topic2. If the value of this column is false, enrich the event using state and update the Cassandra column as true. If true, ignore this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
    saveEventToCassandra(event)
    Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
      val state = readFromCassandra(state)
      if (state does not exist) None //ignore this event
      else {
        if (state.flag == true) None // ignore this event
        else {
           updateFlagAsTrueInCassandra(event)
           Some(event)
        }
      }
    }


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

Reply | Threaded
Open this post in threaded view
|

Re: Approaches for external state for Flink

raghav280392

On Fri, Apr 23, 2021 at 3:52 PM Oğuzhan Mangır <[hidden email]> wrote:

I'm trying to design a stream flow that checks de-duplicate events and sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state store. The details of my job;

I have an event payload with uuid Field. If the event that has the same uuid will come, this event should be discarded. In my case, two kafka topics are reading. The first topic has a lot of fields, but other topics just have a uuid field, thus I have to enrich data using the same uuid for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra using the uuid. If a state exists, ignore this event and do not emit to the Kafka. If state does not exist, save  this event to the Cassandra, then emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra using the uuid. If state exists, check a column that represents this event came from topic2. If the value of this column is false, enrich the event using state and update the Cassandra column as true. If true, ignore this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
    saveEventToCassandra(event)
    Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
      val state = readFromCassandra(state)
      if (state does not exist) None //ignore this event
      else {
        if (state.flag == true) None // ignore this event
        else {
           updateFlagAsTrueInCassandra(event)
           Some(event)
        }
      }
    }


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.



--
Raghavendar T S
Reply | Threaded
Open this post in threaded view
|

Re: Approaches for external state for Flink

David Anderson-4
In reply to this post by Oğuzhan Mangır
Oguzhan,

Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

How large is "very large"? Using RocksDB, several users have reported working with jobs using many TBs of state.

And there are techniques for bootstrapping the state. That doesn't have to be a showstopper.

May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

That's a good reason to reconsider using Flink state.

Regards,
David



On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <[hidden email]> wrote:

I'm trying to design a stream flow that checks de-duplicate events and sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state store. The details of my job;

I have an event payload with uuid Field. If the event that has the same uuid will come, this event should be discarded. In my case, two kafka topics are reading. The first topic has a lot of fields, but other topics just have a uuid field, thus I have to enrich data using the same uuid for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra using the uuid. If a state exists, ignore this event and do not emit to the Kafka. If state does not exist, save  this event to the Cassandra, then emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra using the uuid. If state exists, check a column that represents this event came from topic2. If the value of this column is false, enrich the event using state and update the Cassandra column as true. If true, ignore this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
    saveEventToCassandra(event)
    Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
      val state = readFromCassandra(state)
      if (state does not exist) None //ignore this event
      else {
        if (state.flag == true) None // ignore this event
        else {
           updateFlagAsTrueInCassandra(event)
           Some(event)
        }
      }
    }


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

Reply | Threaded
Open this post in threaded view
|

Re: Approaches for external state for Flink

David Anderson-4
What are the other techniques for bootstrapping rocksdb state?

Bootstrapping state involves somehow creating a snapshot (typically a savepoint, but a retained checkpoint can be a better choice in some cases) containing the necessary state -- meaning that the state has the same operator uid and and state descriptor used by the real streaming job. 

You can do this by either: (1) running a variant of the live streaming job against the data used for bootstrapping and taking a snapshot when the data has been fully ingested, or (2) by using the State Processor API [1]. You'll find a trivial example of the second approach in [2]. Once you have a suitable snapshot, you can run your real job against it.


Regards,
David

On Sat, Apr 24, 2021 at 3:01 PM Omngr <[hidden email]> wrote:
Hi David, thank you for your response first!

The state size is about 1 TB for now, but it will increase fastly, and also I can not use the TLL for states. It will grow indefinitely.
What are the other techniques for bootstrapping rocksdb state?

David Anderson <[hidden email]>, 24 Nis 2021 Cmt, 15:43 tarihinde şunu yazdı:
Oguzhan,

Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

How large is "very large"? Using RocksDB, several users have reported working with jobs using many TBs of state.

And there are techniques for bootstrapping the state. That doesn't have to be a showstopper.

May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

That's a good reason to reconsider using Flink state.

Regards,
David



On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <[hidden email]> wrote:

I'm trying to design a stream flow that checks de-duplicate events and sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state store. The details of my job;

I have an event payload with uuid Field. If the event that has the same uuid will come, this event should be discarded. In my case, two kafka topics are reading. The first topic has a lot of fields, but other topics just have a uuid field, thus I have to enrich data using the same uuid for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra using the uuid. If a state exists, ignore this event and do not emit to the Kafka. If state does not exist, save  this event to the Cassandra, then emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra using the uuid. If state exists, check a column that represents this event came from topic2. If the value of this column is false, enrich the event using state and update the Cassandra column as true. If true, ignore this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
    saveEventToCassandra(event)
    Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
      val state = readFromCassandra(state)
      if (state does not exist) None //ignore this event
      else {
        if (state.flag == true) None // ignore this event
        else {
           updateFlagAsTrueInCassandra(event)
           Some(event)
        }
      }
    }


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

Reply | Threaded
Open this post in threaded view
|

Re: Approaches for external state for Flink

Swagat Mishra
Why not use upserts? Wouldn't that solve the issue of duplicates and there won't be a need to query database too?

On Sat, Apr 24, 2021, 8:12 PM David Anderson <[hidden email]> wrote:
What are the other techniques for bootstrapping rocksdb state?

Bootstrapping state involves somehow creating a snapshot (typically a savepoint, but a retained checkpoint can be a better choice in some cases) containing the necessary state -- meaning that the state has the same operator uid and and state descriptor used by the real streaming job. 

You can do this by either: (1) running a variant of the live streaming job against the data used for bootstrapping and taking a snapshot when the data has been fully ingested, or (2) by using the State Processor API [1]. You'll find a trivial example of the second approach in [2]. Once you have a suitable snapshot, you can run your real job against it.


Regards,
David

On Sat, Apr 24, 2021 at 3:01 PM Omngr <[hidden email]> wrote:
Hi David, thank you for your response first!

The state size is about 1 TB for now, but it will increase fastly, and also I can not use the TLL for states. It will grow indefinitely.
What are the other techniques for bootstrapping rocksdb state?

David Anderson <[hidden email]>, 24 Nis 2021 Cmt, 15:43 tarihinde şunu yazdı:
Oguzhan,

Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

How large is "very large"? Using RocksDB, several users have reported working with jobs using many TBs of state.

And there are techniques for bootstrapping the state. That doesn't have to be a showstopper.

May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

That's a good reason to reconsider using Flink state.

Regards,
David



On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <[hidden email]> wrote:

I'm trying to design a stream flow that checks de-duplicate events and sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state store. The details of my job;

I have an event payload with uuid Field. If the event that has the same uuid will come, this event should be discarded. In my case, two kafka topics are reading. The first topic has a lot of fields, but other topics just have a uuid field, thus I have to enrich data using the same uuid for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra using the uuid. If a state exists, ignore this event and do not emit to the Kafka. If state does not exist, save  this event to the Cassandra, then emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra using the uuid. If state exists, check a column that represents this event came from topic2. If the value of this column is false, enrich the event using state and update the Cassandra column as true. If true, ignore this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
    saveEventToCassandra(event)
    Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
      val state = readFromCassandra(state)
      if (state does not exist) None //ignore this event
      else {
        if (state.flag == true) None // ignore this event
        else {
           updateFlagAsTrueInCassandra(event)
           Some(event)
        }
      }
    }


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

Reply | Threaded
Open this post in threaded view
|

Re: Approaches for external state for Flink

David Anderson-4
In reply to this post by David Anderson-4
Now, I'm just worried about the state size. State size will grow forever. There is no TTL.

The potential for unbounded state is certainly a problem, and it's going to be a problem no matter how you implement the deduplication. Standard techniques for mitigating this include (1) limiting the timeframe for deduplication, and/or (2) using bloom filters to reduce the storage needed in exchange for some (bounded percentage of) false positives.  But since you must store data from stream1 to use later for enrichment, I think bloom filters are only potentially relevant for deduplicating stream2. 

Do you have any temporal constraints on how the enrichment of stream2 is done? For example, if an event from stream2 arrives before the corresponding event from stream1 has been processed, can you simply ignore the event from stream2? Or should it be buffered, and enriched later? I ask this because checkpointing can become challenging at scale when joining two streams, if there's a requirement to buffer one of the streams so the other can catch up.

Flink may or may not be the best choice for your application. The devil is in the details.

Regards,
David

On Sun, Apr 25, 2021 at 12:25 PM Omngr <[hidden email]> wrote:
Thank you David. That's perfect. 

Now, I'm just worried about the state size. State size will grow forever. There is no TTL. 

24 Nis 2021 Cmt 17:42 tarihinde David Anderson <[hidden email]> şunu yazdı:
What are the other techniques for bootstrapping rocksdb state?

Bootstrapping state involves somehow creating a snapshot (typically a savepoint, but a retained checkpoint can be a better choice in some cases) containing the necessary state -- meaning that the state has the same operator uid and and state descriptor used by the real streaming job. 

You can do this by either: (1) running a variant of the live streaming job against the data used for bootstrapping and taking a snapshot when the data has been fully ingested, or (2) by using the State Processor API [1]. You'll find a trivial example of the second approach in [2]. Once you have a suitable snapshot, you can run your real job against it.


Regards,
David

On Sat, Apr 24, 2021 at 3:01 PM Omngr <[hidden email]> wrote:
Hi David, thank you for your response first!

The state size is about 1 TB for now, but it will increase fastly, and also I can not use the TLL for states. It will grow indefinitely.
What are the other techniques for bootstrapping rocksdb state?

David Anderson <[hidden email]>, 24 Nis 2021 Cmt, 15:43 tarihinde şunu yazdı:
Oguzhan,

Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

How large is "very large"? Using RocksDB, several users have reported working with jobs using many TBs of state.

And there are techniques for bootstrapping the state. That doesn't have to be a showstopper.

May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.

That's a good reason to reconsider using Flink state.

Regards,
David



On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <[hidden email]> wrote:

I'm trying to design a stream flow that checks de-duplicate events and sends them to the Kafka topic.

Basically, flow looks like that;

kafka (multiple topics) =>  flink (checking de-duplication and event enrichment) => kafka (single topic)

For de-duplication, I'm thinking of using Cassandra as an external state store. The details of my job;

I have an event payload with uuid Field. If the event that has the same uuid will come, this event should be discarded. In my case, two kafka topics are reading. The first topic has a lot of fields, but other topics just have a uuid field, thus I have to enrich data using the same uuid for the events coming from the second topic.

Stream1: Messages reading from the first topic. Read state from Cassandra using the uuid. If a state exists, ignore this event and do not emit to the Kafka. If state does not exist, save  this event to the Cassandra, then emit this event to the Kafka.

Stream2: Messages reading from the second topic. Read state from Cassandra using the uuid. If state exists, check a column that represents this event came from topic2. If the value of this column is false, enrich the event using state and update the Cassandra column as true. If true, ignore this event because this event is a duplicate.

def checkDeDuplication(event): Option[Event] = {
  val state = readFromCassandra(state)
  if (state exist) None //ignore this event
  else {
    saveEventToCassandra(event)
    Some(event)
  }
}

def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
      val state = readFromCassandra(state)
      if (state does not exist) None //ignore this event
      else {
        if (state.flag == true) None // ignore this event
        else {
           updateFlagAsTrueInCassandra(event)
           Some(event)
        }
      }
    }


val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
stream1.union(stream2).addSink(kafkaSink)

1- Is that a good approach?

2- Is Cassandra the right choice here? Note, the state size is very large and I have to feed the state from batch flow firstly. Thus I can not use the internal state like rocksdb.

3- Can i improve this logic?

4- May be any bottleneck in that flow? I think to use asyncMap functions for state read/write operations.