Hi,
In generic terms, if a keyed operator outputs its state into a sink, and the state of that operator after a restart can be derived from the sink's contents, can we do just that, as opposed to using checkpointing mechanics? Would that reduce latency (even if only theoretically)? An example: A word counting app with a Kafka source and a Kafka sink (compacted topic). The input is an unbounded stream of single words and the output is a <word, word_count> tuple stream that goes into a Kafka compacted topic. AFAIK Flink can guarantee exactly-once semantics by updating the keyed state of its counting operator on _every_ event - after a restart it can then resume from the last checkpoint. But in this case, given the sink contains exactly the same relevant information as a checkpoint (namely the <key, key_count> tuples), could we load the state of an operator from our sink and avoid the latency added by our state backend? If so, how can this be achieved? If we replaced the Kafka sink with a database sink, could we on startup know which keys a Flink task has been allocated, perform a _single_ query to the database to load the key_counts and load those into the operator? How can this be achieved? Instead of a single query you may want to do a batched query, as long as you're not querying the database once per key. Thanks, Eduardo |
Is it possible someone could comment on this question in either direction please? Thanks, Eduardo On Sat, 8 Jun 2019, 14:10 Eduardo Winpenny Tejedor, <[hidden email]> wrote:
|
Hi, Eduardo Currently, we can't load state from the outside(there is an ongoing jira[1] to do this), in the other word, if you disable checkpoint, and use the Kafka/database as your state storage, you should do the deduplication things by yourself. Just curious, which state backend do you use, and how about the latency? Eduardo Winpenny Tejedor <[hidden email]> 于2019年6月13日周四 下午11:31写道:
|
Hi Congixian, I don't use Flink at the moment, I am trying to evaluate its suitability for my company's purposes by re-writing one of our apps with Flink. We have apps with similar business logic but different code, despite they do essentially the same thing. I am new to the streaming paradigms and concepts so any guidance is appreciated. These apps consume a Kafka stream of "delta" messages, update the total sum of the property relevant to them and then send "update" messages unto another Kafka topic. Furthermore, they also produce an hourly "snapshot" (i.e. the value of the property at exactly 09:00, 10:00, 11:00...). On a restart they fully read the output topic, at which point they'll be in the same state they were before the shutdown, and they then continue reading from the "delta" topic - this is how we guarantee exactly-once processing. Please point out if this a "code smell" in the streaming with Flink paradigm. Going back to my question, great to hear there's a jira for this! I hope you can see why it's an attractive idea to avoid the latency incurred by the checkpointing mechanics, given we're already publishing the meaningful state of the app. In the meantime I guess I'll have to use a backend with checkpointing, any guidelines as to what state backend to use? Any other option I should consider? Thanks, Eduardo On Fri, 14 Jun 2019, 03:11 Congxian Qiu, <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |