Deduplicating record amplification

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Deduplicating record amplification

Rex Fenley
Hello,

We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

>Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

which has put a damper on our investigation.

I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
Is this essentially what we want?
Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


Thanks!


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Deduplicating record amplification

Rex Fenley
Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
Hello,

We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

>Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

which has put a damper on our investigation.

I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
Is this essentially what we want?
Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


Thanks!


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Deduplicating record amplification

Rex Fenley
Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
Hello,

We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

>Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

which has put a damper on our investigation.

I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
Is this essentially what we want?
Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


Thanks!


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Deduplicating record amplification

Rex Fenley
Hello,

  • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

  • This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Arvid Heise-4
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Arvid Heise-4
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    Ok, that sounds like it confirms my expectations.

    So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java.

    class CompactionAggregate
    extends AggregateFunction[
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row]
    ] {

    override def createAccumulator() = new Tuple2(false, null)

    // Just take the lastest value to compact.
    override def add(
    value: Tuple2[java.lang.Boolean, Row],
    accumulator: Tuple2[java.lang.Boolean, Row]
    ) =
    value

    override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
    accumulator

    // This is a required function that we don't use.
    override def merge(
    a: Tuple2[java.lang.Boolean, Row],
    b: Tuple2[java.lang.Boolean, Row]
    ) =
    throw new NotImplementedException()
    }

    But when running I get the following error:
    >Caused by: java.lang.RuntimeException: Could not extract key from [redacted row]
    >...
    > Caused by: org.apache.flink.table.api.ValidationException: Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are supported when converting to an expression.

    I'm googling around and haven't found anything informative about what might be causing this issue. Any ideas?

    I'll also take a look at the SQL functions you suggested and see if I can use those.

    Thanks!



    On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    I think I may have been affected by some late night programming.

    Slightly revised how I'm using my aggregate
    val userDocsStream =
    this.tableEnv
    .toRetractStream(userDocsTable, classOf[Row])
    .keyBy(_.f1.getField(0))
    val compactedUserDocsStream = userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())
    but this now gives me the following exception:
    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
        at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:829)

    Which I'm not at all sure how to interpret

    On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <[hidden email]> wrote:
    Ok, that sounds like it confirms my expectations.

    So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java.

    class CompactionAggregate
    extends AggregateFunction[
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row]
    ] {

    override def createAccumulator() = new Tuple2(false, null)

    // Just take the lastest value to compact.
    override def add(
    value: Tuple2[java.lang.Boolean, Row],
    accumulator: Tuple2[java.lang.Boolean, Row]
    ) =
    value

    override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
    accumulator

    // This is a required function that we don't use.
    override def merge(
    a: Tuple2[java.lang.Boolean, Row],
    b: Tuple2[java.lang.Boolean, Row]
    ) =
    throw new NotImplementedException()
    }

    But when running I get the following error:
    >Caused by: java.lang.RuntimeException: Could not extract key from [redacted row]
    >...
    > Caused by: org.apache.flink.table.api.ValidationException: Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are supported when converting to an expression.

    I'm googling around and haven't found anything informative about what might be causing this issue. Any ideas?

    I'll also take a look at the SQL functions you suggested and see if I can use those.

    Thanks!



    On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    It looks like it wants me to call assignTimestampsAndWatermarks but I already have a timer on my window which I'd expect everything entering this stream would simply be aggregated during that window
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))

    On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <[hidden email]> wrote:
    I think I may have been affected by some late night programming.

    Slightly revised how I'm using my aggregate
    val userDocsStream =
    this.tableEnv
    .toRetractStream(userDocsTable, classOf[Row])
    .keyBy(_.f1.getField(0))
    val compactedUserDocsStream = userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())
    but this now gives me the following exception:
    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
        at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:829)

    Which I'm not at all sure how to interpret

    On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <[hidden email]> wrote:
    Ok, that sounds like it confirms my expectations.

    So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java.

    class CompactionAggregate
    extends AggregateFunction[
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row]
    ] {

    override def createAccumulator() = new Tuple2(false, null)

    // Just take the lastest value to compact.
    override def add(
    value: Tuple2[java.lang.Boolean, Row],
    accumulator: Tuple2[java.lang.Boolean, Row]
    ) =
    value

    override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
    accumulator

    // This is a required function that we don't use.
    override def merge(
    a: Tuple2[java.lang.Boolean, Row],
    b: Tuple2[java.lang.Boolean, Row]
    ) =
    throw new NotImplementedException()
    }

    But when running I get the following error:
    >Caused by: java.lang.RuntimeException: Could not extract key from [redacted row]
    >...
    > Caused by: org.apache.flink.table.api.ValidationException: Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are supported when converting to an expression.

    I'm googling around and haven't found anything informative about what might be causing this issue. Any ideas?

    I'll also take a look at the SQL functions you suggested and see if I can use those.

    Thanks!



    On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    Switching to TumblingProcessingTimeWindows seems to have solved that problem.

    For my own understanding, this won't have any "late" and therefore dropped records right? We cannot blindly drop a record from the aggregate evaluation, it just needs to take all the records it gets in a window and process them and then the aggregate will take whatever is last in-order.

    Thanks!

    On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <[hidden email]> wrote:
    It looks like it wants me to call assignTimestampsAndWatermarks but I already have a timer on my window which I'd expect everything entering this stream would simply be aggregated during that window
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))

    On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <[hidden email]> wrote:
    I think I may have been affected by some late night programming.

    Slightly revised how I'm using my aggregate
    val userDocsStream =
    this.tableEnv
    .toRetractStream(userDocsTable, classOf[Row])
    .keyBy(_.f1.getField(0))
    val compactedUserDocsStream = userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())
    but this now gives me the following exception:
    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
        at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:829)

    Which I'm not at all sure how to interpret

    On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <[hidden email]> wrote:
    Ok, that sounds like it confirms my expectations.

    So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java.

    class CompactionAggregate
    extends AggregateFunction[
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row]
    ] {

    override def createAccumulator() = new Tuple2(false, null)

    // Just take the lastest value to compact.
    override def add(
    value: Tuple2[java.lang.Boolean, Row],
    accumulator: Tuple2[java.lang.Boolean, Row]
    ) =
    value

    override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
    accumulator

    // This is a required function that we don't use.
    override def merge(
    a: Tuple2[java.lang.Boolean, Row],
    b: Tuple2[java.lang.Boolean, Row]
    ) =
    throw new NotImplementedException()
    }

    But when running I get the following error:
    >Caused by: java.lang.RuntimeException: Could not extract key from [redacted row]
    >...
    > Caused by: org.apache.flink.table.api.ValidationException: Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are supported when converting to an expression.

    I'm googling around and haven't found anything informative about what might be causing this issue. Any ideas?

    I'll also take a look at the SQL functions you suggested and see if I can use those.

    Thanks!



    On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Arvid Heise-4
    Hi Rex,

    there cannot be any late event in processing time by definition (maybe on a quantum computer?), so you should be fine. The timestamp of records in processing time is monotonously increasing.

    Best,

    Arvid

    On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley <[hidden email]> wrote:
    Switching to TumblingProcessingTimeWindows seems to have solved that problem.

    For my own understanding, this won't have any "late" and therefore dropped records right? We cannot blindly drop a record from the aggregate evaluation, it just needs to take all the records it gets in a window and process them and then the aggregate will take whatever is last in-order.

    Thanks!

    On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <[hidden email]> wrote:
    It looks like it wants me to call assignTimestampsAndWatermarks but I already have a timer on my window which I'd expect everything entering this stream would simply be aggregated during that window
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))

    On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <[hidden email]> wrote:
    I think I may have been affected by some late night programming.

    Slightly revised how I'm using my aggregate
    val userDocsStream =
    this.tableEnv
    .toRetractStream(userDocsTable, classOf[Row])
    .keyBy(_.f1.getField(0))
    val compactedUserDocsStream = userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())
    but this now gives me the following exception:
    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
        at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:829)

    Which I'm not at all sure how to interpret

    On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <[hidden email]> wrote:
    Ok, that sounds like it confirms my expectations.

    So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java.

    class CompactionAggregate
    extends AggregateFunction[
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row]
    ] {

    override def createAccumulator() = new Tuple2(false, null)

    // Just take the lastest value to compact.
    override def add(
    value: Tuple2[java.lang.Boolean, Row],
    accumulator: Tuple2[java.lang.Boolean, Row]
    ) =
    value

    override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
    accumulator

    // This is a required function that we don't use.
    override def merge(
    a: Tuple2[java.lang.Boolean, Row],
    b: Tuple2[java.lang.Boolean, Row]
    ) =
    throw new NotImplementedException()
    }

    But when running I get the following error:
    >Caused by: java.lang.RuntimeException: Could not extract key from [redacted row]
    >...
    > Caused by: org.apache.flink.table.api.ValidationException: Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are supported when converting to an expression.

    I'm googling around and haven't found anything informative about what might be causing this issue. Any ideas?

    I'll also take a look at the SQL functions you suggested and see if I can use those.

    Thanks!



    On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    Reply | Threaded
    Open this post in threaded view
    |

    Re: Deduplicating record amplification

    Rex Fenley
    Great, thank you for the confirmation!

    On Thu, Jan 28, 2021 at 11:25 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    there cannot be any late event in processing time by definition (maybe on a quantum computer?), so you should be fine. The timestamp of records in processing time is monotonously increasing.

    Best,

    Arvid

    On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley <[hidden email]> wrote:
    Switching to TumblingProcessingTimeWindows seems to have solved that problem.

    For my own understanding, this won't have any "late" and therefore dropped records right? We cannot blindly drop a record from the aggregate evaluation, it just needs to take all the records it gets in a window and process them and then the aggregate will take whatever is last in-order.

    Thanks!

    On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <[hidden email]> wrote:
    It looks like it wants me to call assignTimestampsAndWatermarks but I already have a timer on my window which I'd expect everything entering this stream would simply be aggregated during that window
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))

    On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <[hidden email]> wrote:
    I think I may have been affected by some late night programming.

    Slightly revised how I'm using my aggregate
    val userDocsStream =
    this.tableEnv
    .toRetractStream(userDocsTable, classOf[Row])
    .keyBy(_.f1.getField(0))
    val compactedUserDocsStream = userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())
    but this now gives me the following exception:
    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
        at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:829)

    Which I'm not at all sure how to interpret

    On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <[hidden email]> wrote:
    Ok, that sounds like it confirms my expectations.

    So I tried running my above code and had to slightly edit to using java Tuple2 because our execution environment stuff is all in Java.

    class CompactionAggregate
    extends AggregateFunction[
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row],
    Tuple2[java.lang.Boolean, Row]
    ] {

    override def createAccumulator() = new Tuple2(false, null)

    // Just take the lastest value to compact.
    override def add(
    value: Tuple2[java.lang.Boolean, Row],
    accumulator: Tuple2[java.lang.Boolean, Row]
    ) =
    value

    override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
    accumulator

    // This is a required function that we don't use.
    override def merge(
    a: Tuple2[java.lang.Boolean, Row],
    b: Tuple2[java.lang.Boolean, Row]
    ) =
    throw new NotImplementedException()
    }

    But when running I get the following error:
    >Caused by: java.lang.RuntimeException: Could not extract key from [redacted row]
    >...
    > Caused by: org.apache.flink.table.api.ValidationException: Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are supported when converting to an expression.

    I'm googling around and haven't found anything informative about what might be causing this issue. Any ideas?

    I'll also take a look at the SQL functions you suggested and see if I can use those.

    Thanks!



    On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    if your keyby (and with join/grouping/windowing) is random or not depends on the relationship of the join/grouping key with your Kafka partitioning key.

    Say your partitioning key is document_id. Then, any join/grouping key that is composed of (or is exactly) document_id, will retain the order. You should always ask yourself the question: can two records coming from the ordered Kafka partition X be processed by two different operator instances. For a join/grouping operator, there is only the strict guarantee that all records with the same key will be shuffled into the same operator instance.

    Your compaction in general looks good but I'm not deep into Table API. I'm quite sure that FIRST_VALUE and LAST_VALUE functions in Table API should already do what you want. [1]


    On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[hidden email]> wrote:
    In addition to those questions, assuming that keyed streams are in order, I've come up with the following solution to compact our records and only pick the most recent one per id before sending to the ES sink.

    The first item in the Row is the document ID / primary key which we want to compact records on.
    val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
    userDocsStream
    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
    .aggregate(new CompactionAggregate())class CompactionAggregate
    extends AggregateFunction[
    (Boolean, Row),
    (Boolean, Row),
    (Boolean, Row)
    ] { override def createAccumulator() = (false, null) // Just take the latest value to compact.
    override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
    value override def getResult(accumulator: (Boolean, Row)) = accumulator // This is a required function that we don't use.
    override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
    throw new NotImplementedException()
    }
    I'm hoping that if the last record in the window is an insert it picks that if it's a retract then it picks that and then when we send this to the ES sink we will simply check true or false in the first element of the tuple for an insert or delete request to ES. Does this seem like it will work?

    Thanks!


    On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[hidden email]> wrote:
    This is great info, thanks!

    My question then becomes, what constitutes a random shuffle? Currently we're using the Table API with minibatch on flink v1.11.3. Do our joins output a keyed stream of records by join key or is this random? I imagine that they'd have to have a key for retracts and accumulates to arrive in order on the next downstream operator. Same with aggs but on the groupBy key.

    Does this sound correct to you?

    Thanks!

    On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[hidden email]> wrote:
    Hi Rex,

    indeed these two statements look like they contradict each other, but they are looking at both sides from the same coin.
    Flink is simply putting records in FIFO in windows. That is, there is no ordering on event time if there are late events. So if your elements arrive ordered, the ordering is retained. If your elements arrive unordered, the same unordered order is retained.

    However, note that Flink can only guarantee FIFO according to your topology. Consider a source with parallelism 2, each reading data from an ordered kafka partition (k1, k2) respectively. Each partition has records with keys, such that no key appears in both partitions (default behavior if you set keys but no partition while writing to Kafka).
    1) Let's assume you do a simple transformation and write them back into kafka with the same key. Then you can be sure that the order of the records is retained.

    2) Now you add a random shuffle and have the transformation. Now two successive records may be processed in parallel and there is a race condition who is written first into Kafka. So order is not retained.

    3) You shuffle both partitions by the Kafka key (keyby) and do some aggregation. Two successive records with the same key will always be processed by the same aggregation operator. So the order is retained for each key (note that this is what you usually want and want Kafka gives you if you don't set the partition explicitly and just provide a key)

    4) You shuffle both partitions by a different key. Then two successive Kafka records could be again calculated in parallel such that there is a race condition.

    Note that windows are a kind of aggregation.

    So Flink is never going to restore an ordering that is not there (because it's too costly and there are too many unknowns). But you can infer the guarantees by analyzing your topology.

    ---

    Please note that there is a common pitfall when you work with Kafka:
    - Ordering of records in Kafka is only guaranteed if you set max.in.flight.requests.per.connection to 1. [1]
    - Often you also want to set enable.idempotence and acks=all

    That is true for the upstream application and if you plan back to write to Kafka you also need to set that in Flink.


    On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[hidden email]> wrote:
    Hello,

    • > Redistributing streams (between map() and keyBy/window, as well as between keyBy/window and sink) change the partitioning of streams. Each operator subtask sends data to different target subtasks, depending on the selected transformation. Examples are keyBy() (re-partitions by hash code), broadcast(), or rebalance() (random redistribution). In a redistributing exchange, order among elements is only preserved for each pair of sending- and receiving task (for example subtask[1] of map() and subtask[2] of keyBy/window).

    This makes it sounds like ordering on the same partition/key is always maintained. Which is exactly the ordering guarantee that I need. This seems to slightly contradict the statement "Flink provides no guarantees about the order of the elements within a window" for keyed state. So is it true that ordering _is_ guaranteed for identical keys?

    If I'm not mistaken, the state in the TableAPI is always considered keyed state for a join or aggregate. Or am I missing something?

    Thanks!

    On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[hidden email]> wrote:
    Our data arrives in order from Kafka, so we are hoping to use that same order for our processing.

    On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[hidden email]> wrote:
    Going further, if "Flink provides no guarantees about the order of the elements within a window" then with minibatch, which I assume uses a window under the hood, any aggregates that expect rows to arrive in order will fail to keep their consistency. Is this correct?

    On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[hidden email]> wrote:
    Hello,

    We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

    Currently, we have been relentlessly trying to reduce our record amplification which, when our Elasticsearch index is near fully populated, completely bottlenecks our write performance. We decided recently to try a new job using mini-batch. At first this seemed promising but at some point we began getting huge record amplification in a join operator. It appears that minibatch may only batch on aggregate operators?

    So we're now thinking that we should have a window before our ES sink which only takes the last record for any unique document id in the window, since that's all we really want to send anyway. However, when investigating turning a table, to a keyed window stream for deduping, and then back into a table I read the following:

    >Attention Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last. [1]

    which has put a damper on our investigation.

    I then found the deduplication SQL doc [2], but I have a hard time parsing what the SQL does and we've never used TemporaryViews or proctime before.
    Is this essentially what we want?
    Will just using this SQL be safe for a job that is unbounded and just wants to deduplicate a document write to whatever the most current one is (i.e. will restoring from a checkpoint maintain our unbounded consistency and will deletes work)?


    Thanks!


    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

    --

    Rex Fenley  |  Software Engineer - Mobile and Backend


    Remind.com |  BLOG  |  FOLLOW US  |  LIKE US