How to emit after a merge?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to emit after a merge?

Yik San Chan
I define a `Transaction` class:

```scala
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
```

The `TransactionSource` simply emits `Transaction` with some time interval. Now I want to compute the last 2 transaction timestamp of each account id, see code below:

```scala
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

  final val QUERY =
    """
      |WITH last_n AS (
      |    SELECT accountId, `timestamp`
      |    FROM (
      |        SELECT *,
      |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
      |        FROM transactions
      |    )
      |    WHERE row_num <= 2
      |)
      |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
      |FROM last_n
      |GROUP BY accountId
      |""".stripMargin

  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)

    val txnStream: DataStream[Transaction] = streamEnv
      .addSource(new TransactionSource)
      .name("transactions")

    tableEnv.createTemporaryView("transactions", txnStream)

    tableEnv.executeSql(QUERY).print()
  }
}
```

When I run the program, I get:

```
+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| +I |                    1 |                  1546272000000 |
| +I |                    2 |                  1546272360000 |
| +I |                    3 |                  1546272720000 |
| +I |                    4 |                  1546273080000 |
| +I |                    5 |                  1546273440000 |
| -U |                    1 |                  1546272000000 |
| +U |                    1 |    1546272000000,1546273800000 |
| -U |                    2 |                  1546272360000 |
| +U |                    2 |    1546272360000,1546274160000 |
| -U |                    3 |                  1546272720000 |
| +U |                    3 |    1546272720000,1546274520000 |
| -U |                    4 |                  1546273080000 |
| +U |                    4 |    1546273080000,1546274880000 |
| -U |                    5 |                  1546273440000 |
| +U |                    5 |    1546273440000,1546275240000 |
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
(to continue)
```

Let's focus on the last transaction (from above) of accountId=1. When there is a new transaction from account 1 that happens at timestamp=1546275600000, there are 4 operations in total.

```
+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
```

While I only want to emit the below "new status" to my downstream (let's say another Kafka topic) via some sort of merging:

```
+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |    1546273800000,1546275600000 |
```

So that my downstream is able to consume literally "the last 2 transaction timestamps of each account":
```
+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |                  1546272000000 |
|                    1 |    1546272000000,1546273800000 |
|                    1 |    1546273800000,1546275600000 |
(to continue)
```

What is the right way to do this?
Reply | Threaded
Open this post in threaded view
|

Re: How to emit after a merge?

Timo Walther
Hi Yik,

if I understand you correctly you would like to avoid the deletions in
your stream?

You could filter the deletions manually in DataStream API before writing
them to Kafka. Semantically the deletions are required to produce a
correct result because the runtime is not aware of a key for idempotent
updates.

To simplify the query you could also investigate to implement your own
aggregate function and combine the Top 2 and ListAgg into one operation.

Regards,
Timo

On 28.02.21 09:55, Yik San Chan wrote:

> I define a `Transaction` class:
>
> ```scala
> case class Transaction(accountId: Long, amount: Long, timestamp: Long)
> ```
>
> The `TransactionSource` simply emits `Transaction` with some time
> interval. Now I want to compute the last 2 transaction timestamp of each
> account id, see code below:
>
> ```scala
> import org.apache.flink.streaming.api.scala.{DataStream,
> StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.EnvironmentSettings
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.walkthrough.common.entity.Transaction
> import org.apache.flink.walkthrough.common.source.TransactionSource
>
> object LastNJob {
>
>    final val QUERY =
>      """
>        |WITH last_n AS (
>        |    SELECT accountId, `timestamp`
>        |    FROM (
>        |        SELECT *,
>        |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY
> `timestamp` DESC) AS row_num
>        |        FROM transactions
>        |    )
>        |    WHERE row_num <= 2
>        |)
>        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
> last2_timestamp
>        |FROM last_n
>        |GROUP BY accountId
>        |""".stripMargin
>
>    def main(args: Array[String]): Unit = {
>      val settings: EnvironmentSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>      val streamEnv: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>      val tableEnv: StreamTableEnvironment =
> StreamTableEnvironment.create(streamEnv, settings)
>
>      val txnStream: DataStream[Transaction] = streamEnv
>        .addSource(new TransactionSource)
>        .name("transactions")
>
>      tableEnv.createTemporaryView("transactions", txnStream)
>
>      tableEnv.executeSql(QUERY).print()
>    }
> }
> ```
>
> When I run the program, I get:
>
> ```
> +----+----------------------+--------------------------------+
> | op |            accountId |                last2_timestamp |
> +----+----------------------+--------------------------------+
> | +I |                    1 |                  1546272000000 |
> | +I |                    2 |                  1546272360000 |
> | +I |                    3 |                  1546272720000 |
> | +I |                    4 |                  1546273080000 |
> | +I |                    5 |                  1546273440000 |
> | -U |                    1 |                  1546272000000 |
> | +U |                    1 |    1546272000000,1546273800000 |
> | -U |                    2 |                  1546272360000 |
> | +U |                    2 |    1546272360000,1546274160000 |
> | -U |                    3 |                  1546272720000 |
> | +U |                    3 |    1546272720000,1546274520000 |
> | -U |                    4 |                  1546273080000 |
> | +U |                    4 |    1546273080000,1546274880000 |
> | -U |                    5 |                  1546273440000 |
> | +U |                    5 |    1546273440000,1546275240000 |
> | -U |                    1 |    1546272000000,1546273800000 |
> | +U |                    1 |                  1546273800000 |
> | -U |                    1 |                  1546273800000 |
> | +U |                    1 |    1546273800000,1546275600000 |
> (to continue)
> ```
>
> Let's focus on the last transaction (from above) of accountId=1. When
> there is a new transaction from account 1 that happens at
> timestamp=1546275600000, there are 4 operations in total.
>
> ```
> +----+----------------------+--------------------------------+
> | op |            accountId |                last2_timestamp |
> +----+----------------------+--------------------------------+
> | -U |                    1 |    1546272000000,1546273800000 |
> | +U |                    1 |                  1546273800000 |
> | -U |                    1 |                  1546273800000 |
> | +U |                    1 |    1546273800000,1546275600000 |
> ```
>
> While I only want to emit the below "new status" to my downstream (let's
> say another Kafka topic) via some sort of merging:
>
> ```
> +----------------------+--------------------------------+
> |            accountId |                last2_timestamp |
> +----------------------+--------------------------------+
> |                    1 |    1546273800000,1546275600000 |
> ```
>
> So that my downstream is able to consume literally "the last 2
> transaction timestamps of each account":
> ```
> +----------------------+--------------------------------+
> |            accountId |                last2_timestamp |
> +----------------------+--------------------------------+
> |                    1 |                  1546272000000 |
> |                    1 |    1546272000000,1546273800000 |
> |                    1 |    1546273800000,1546275600000 |
> (to continue)
> ```
>
> What is the right way to do this?

Reply | Threaded
Open this post in threaded view
|

Re: How to emit after a merge?

Yik San Chan
Hi Timo,

Thanks for the reply!

> You could filter the deletions manually in DataStream API before writing
them to Kafka.

Yah I agree this helps the issue, though I will need to mix up SQL and DataStream API.

> To simplify the query you could also investigate to implement your own
aggregate function and combine the Top 2 and ListAgg into one operation.

Do you mean implement an UDF to do so?

Besides, is 'upsert-kafka' connector designed for this use case?

Thank you.

On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <[hidden email]> wrote:
Hi Yik,

if I understand you correctly you would like to avoid the deletions in
your stream?

You could filter the deletions manually in DataStream API before writing
them to Kafka. Semantically the deletions are required to produce a
correct result because the runtime is not aware of a key for idempotent
updates.

To simplify the query you could also investigate to implement your own
aggregate function and combine the Top 2 and ListAgg into one operation.

Regards,
Timo

On 28.02.21 09:55, Yik San Chan wrote:
> I define a `Transaction` class:
>
> ```scala
> case class Transaction(accountId: Long, amount: Long, timestamp: Long)
> ```
>
> The `TransactionSource` simply emits `Transaction` with some time
> interval. Now I want to compute the last 2 transaction timestamp of each
> account id, see code below:
>
> ```scala
> import org.apache.flink.streaming.api.scala.{DataStream,
> StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.EnvironmentSettings
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.walkthrough.common.entity.Transaction
> import org.apache.flink.walkthrough.common.source.TransactionSource
>
> object LastNJob {
>
>    final val QUERY =
>      """
>        |WITH last_n AS (
>        |    SELECT accountId, `timestamp`
>        |    FROM (
>        |        SELECT *,
>        |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY
> `timestamp` DESC) AS row_num
>        |        FROM transactions
>        |    )
>        |    WHERE row_num <= 2
>        |)
>        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
> last2_timestamp
>        |FROM last_n
>        |GROUP BY accountId
>        |""".stripMargin
>
>    def main(args: Array[String]): Unit = {
>      val settings: EnvironmentSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>      val streamEnv: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>      val tableEnv: StreamTableEnvironment =
> StreamTableEnvironment.create(streamEnv, settings)
>
>      val txnStream: DataStream[Transaction] = streamEnv
>        .addSource(new TransactionSource)
>        .name("transactions")
>
>      tableEnv.createTemporaryView("transactions", txnStream)
>
>      tableEnv.executeSql(QUERY).print()
>    }
> }
> ```
>
> When I run the program, I get:
>
> ```
> +----+----------------------+--------------------------------+
> | op |            accountId |                last2_timestamp |
> +----+----------------------+--------------------------------+
> | +I |                    1 |                  1546272000000 |
> | +I |                    2 |                  1546272360000 |
> | +I |                    3 |                  1546272720000 |
> | +I |                    4 |                  1546273080000 |
> | +I |                    5 |                  1546273440000 |
> | -U |                    1 |                  1546272000000 |
> | +U |                    1 |    1546272000000,1546273800000 |
> | -U |                    2 |                  1546272360000 |
> | +U |                    2 |    1546272360000,1546274160000 |
> | -U |                    3 |                  1546272720000 |
> | +U |                    3 |    1546272720000,1546274520000 |
> | -U |                    4 |                  1546273080000 |
> | +U |                    4 |    1546273080000,1546274880000 |
> | -U |                    5 |                  1546273440000 |
> | +U |                    5 |    1546273440000,1546275240000 |
> | -U |                    1 |    1546272000000,1546273800000 |
> | +U |                    1 |                  1546273800000 |
> | -U |                    1 |                  1546273800000 |
> | +U |                    1 |    1546273800000,1546275600000 |
> (to continue)
> ```
>
> Let's focus on the last transaction (from above) of accountId=1. When
> there is a new transaction from account 1 that happens at
> timestamp=1546275600000, there are 4 operations in total.
>
> ```
> +----+----------------------+--------------------------------+
> | op |            accountId |                last2_timestamp |
> +----+----------------------+--------------------------------+
> | -U |                    1 |    1546272000000,1546273800000 |
> | +U |                    1 |                  1546273800000 |
> | -U |                    1 |                  1546273800000 |
> | +U |                    1 |    1546273800000,1546275600000 |
> ```
>
> While I only want to emit the below "new status" to my downstream (let's
> say another Kafka topic) via some sort of merging:
>
> ```
> +----------------------+--------------------------------+
> |            accountId |                last2_timestamp |
> +----------------------+--------------------------------+
> |                    1 |    1546273800000,1546275600000 |
> ```
>
> So that my downstream is able to consume literally "the last 2
> transaction timestamps of each account":
> ```
> +----------------------+--------------------------------+
> |            accountId |                last2_timestamp |
> +----------------------+--------------------------------+
> |                    1 |                  1546272000000 |
> |                    1 |    1546272000000,1546273800000 |
> |                    1 |    1546273800000,1546275600000 |
> (to continue)
> ```
>
> What is the right way to do this?

Reply | Threaded
Open this post in threaded view
|

Re: How to emit after a merge?

Timo Walther
Yes, implementing a UDF might be the most convenient option for some use
cases. The accumulator of such a UDF could take the two timestamps and
perform the two aggregations at once.

The upsert-kafka connector can apply the updates to the Kafka log. If
you enable log compaction in Kafka, Kafka will clean up the log and make
sure to only keep the most recent one.

Regards,
Timo

On 04.03.21 11:59, Yik San Chan wrote:

> Hi Timo,
>
> Thanks for the reply!
>
>  > You could filter the deletions manually in DataStream API before writing
> them to Kafka.
>
> Yah I agree this helps the issue, though I will need to mix up SQL and
> DataStream API.
>
>  > To simplify the query you could also investigate to implement your own
> aggregate function and combine the Top 2 and ListAgg into one operation.
>
> Do you mean implement an UDF to do so?
>
> Besides, is 'upsert-kafka' connector designed for this use case?
>
> Thank you.
>
> On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Yik,
>
>     if I understand you correctly you would like to avoid the deletions in
>     your stream?
>
>     You could filter the deletions manually in DataStream API before
>     writing
>     them to Kafka. Semantically the deletions are required to produce a
>     correct result because the runtime is not aware of a key for idempotent
>     updates.
>
>     To simplify the query you could also investigate to implement your own
>     aggregate function and combine the Top 2 and ListAgg into one operation.
>
>     Regards,
>     Timo
>
>     On 28.02.21 09:55, Yik San Chan wrote:
>      > I define a `Transaction` class:
>      >
>      > ```scala
>      > case class Transaction(accountId: Long, amount: Long, timestamp:
>     Long)
>      > ```
>      >
>      > The `TransactionSource` simply emits `Transaction` with some time
>      > interval. Now I want to compute the last 2 transaction timestamp
>     of each
>      > account id, see code below:
>      >
>      > ```scala
>      > import org.apache.flink.streaming.api.scala.{DataStream,
>      > StreamExecutionEnvironment, _}
>      > import org.apache.flink.table.api.EnvironmentSettings
>      > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>      > import org.apache.flink.walkthrough.common.entity.Transaction
>      > import org.apache.flink.walkthrough.common.source.TransactionSource
>      >
>      > object LastNJob {
>      >
>      >    final val QUERY =
>      >      """
>      >        |WITH last_n AS (
>      >        |    SELECT accountId, `timestamp`
>      >        |    FROM (
>      >        |        SELECT *,
>      >        |            ROW_NUMBER() OVER (PARTITION BY accountId
>     ORDER BY
>      > `timestamp` DESC) AS row_num
>      >        |        FROM transactions
>      >        |    )
>      >        |    WHERE row_num <= 2
>      >        |)
>      >        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
>      > last2_timestamp
>      >        |FROM last_n
>      >        |GROUP BY accountId
>      >        |""".stripMargin
>      >
>      >    def main(args: Array[String]): Unit = {
>      >      val settings: EnvironmentSettings =
>      >
>     EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>      >      val streamEnv: StreamExecutionEnvironment =
>      > StreamExecutionEnvironment.getExecutionEnvironment
>      >      val tableEnv: StreamTableEnvironment =
>      > StreamTableEnvironment.create(streamEnv, settings)
>      >
>      >      val txnStream: DataStream[Transaction] = streamEnv
>      >        .addSource(new TransactionSource)
>      >        .name("transactions")
>      >
>      >      tableEnv.createTemporaryView("transactions", txnStream)
>      >
>      >      tableEnv.executeSql(QUERY).print()
>      >    }
>      > }
>      > ```
>      >
>      > When I run the program, I get:
>      >
>      > ```
>      > +----+----------------------+--------------------------------+
>      > | op |            accountId |                last2_timestamp |
>      > +----+----------------------+--------------------------------+
>      > | +I |                    1 |                  1546272000000 |
>      > | +I |                    2 |                  1546272360000 |
>      > | +I |                    3 |                  1546272720000 |
>      > | +I |                    4 |                  1546273080000 |
>      > | +I |                    5 |                  1546273440000 |
>      > | -U |                    1 |                  1546272000000 |
>      > | +U |                    1 |    1546272000000,1546273800000 |
>      > | -U |                    2 |                  1546272360000 |
>      > | +U |                    2 |    1546272360000,1546274160000 |
>      > | -U |                    3 |                  1546272720000 |
>      > | +U |                    3 |    1546272720000,1546274520000 |
>      > | -U |                    4 |                  1546273080000 |
>      > | +U |                    4 |    1546273080000,1546274880000 |
>      > | -U |                    5 |                  1546273440000 |
>      > | +U |                    5 |    1546273440000,1546275240000 |
>      > | -U |                    1 |    1546272000000,1546273800000 |
>      > | +U |                    1 |                  1546273800000 |
>      > | -U |                    1 |                  1546273800000 |
>      > | +U |                    1 |    1546273800000,1546275600000 |
>      > (to continue)
>      > ```
>      >
>      > Let's focus on the last transaction (from above) of accountId=1.
>     When
>      > there is a new transaction from account 1 that happens at
>      > timestamp=1546275600000, there are 4 operations in total.
>      >
>      > ```
>      > +----+----------------------+--------------------------------+
>      > | op |            accountId |                last2_timestamp |
>      > +----+----------------------+--------------------------------+
>      > | -U |                    1 |    1546272000000,1546273800000 |
>      > | +U |                    1 |                  1546273800000 |
>      > | -U |                    1 |                  1546273800000 |
>      > | +U |                    1 |    1546273800000,1546275600000 |
>      > ```
>      >
>      > While I only want to emit the below "new status" to my downstream
>     (let's
>      > say another Kafka topic) via some sort of merging:
>      >
>      > ```
>      > +----------------------+--------------------------------+
>      > |            accountId |                last2_timestamp |
>      > +----------------------+--------------------------------+
>      > |                    1 |    1546273800000,1546275600000 |
>      > ```
>      >
>      > So that my downstream is able to consume literally "the last 2
>      > transaction timestamps of each account":
>      > ```
>      > +----------------------+--------------------------------+
>      > |            accountId |                last2_timestamp |
>      > +----------------------+--------------------------------+
>      > |                    1 |                  1546272000000 |
>      > |                    1 |    1546272000000,1546273800000 |
>      > |                    1 |    1546273800000,1546275600000 |
>      > (to continue)
>      > ```
>      >
>      > What is the right way to do this?
>

Reply | Threaded
Open this post in threaded view
|

Re: How to emit after a merge?

Yik San Chan
Hi Timo,

If I understand correctly, the UDF only simplifies the query, but not doing anything functionally different. Please correct me if I am wrong, thank you!

Best,
Yik San

On Thu, Mar 4, 2021 at 8:34 PM Timo Walther <[hidden email]> wrote:
Yes, implementing a UDF might be the most convenient option for some use
cases. The accumulator of such a UDF could take the two timestamps and
perform the two aggregations at once.

The upsert-kafka connector can apply the updates to the Kafka log. If
you enable log compaction in Kafka, Kafka will clean up the log and make
sure to only keep the most recent one.

Regards,
Timo

On 04.03.21 11:59, Yik San Chan wrote:
> Hi Timo,
>
> Thanks for the reply!
>
>  > You could filter the deletions manually in DataStream API before writing
> them to Kafka.
>
> Yah I agree this helps the issue, though I will need to mix up SQL and
> DataStream API.
>
>  > To simplify the query you could also investigate to implement your own
> aggregate function and combine the Top 2 and ListAgg into one operation.
>
> Do you mean implement an UDF to do so?
>
> Besides, is 'upsert-kafka' connector designed for this use case?
>
> Thank you.
>
> On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Yik,
>
>     if I understand you correctly you would like to avoid the deletions in
>     your stream?
>
>     You could filter the deletions manually in DataStream API before
>     writing
>     them to Kafka. Semantically the deletions are required to produce a
>     correct result because the runtime is not aware of a key for idempotent
>     updates.
>
>     To simplify the query you could also investigate to implement your own
>     aggregate function and combine the Top 2 and ListAgg into one operation.
>
>     Regards,
>     Timo
>
>     On 28.02.21 09:55, Yik San Chan wrote:
>      > I define a `Transaction` class:
>      >
>      > ```scala
>      > case class Transaction(accountId: Long, amount: Long, timestamp:
>     Long)
>      > ```
>      >
>      > The `TransactionSource` simply emits `Transaction` with some time
>      > interval. Now I want to compute the last 2 transaction timestamp
>     of each
>      > account id, see code below:
>      >
>      > ```scala
>      > import org.apache.flink.streaming.api.scala.{DataStream,
>      > StreamExecutionEnvironment, _}
>      > import org.apache.flink.table.api.EnvironmentSettings
>      > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>      > import org.apache.flink.walkthrough.common.entity.Transaction
>      > import org.apache.flink.walkthrough.common.source.TransactionSource
>      >
>      > object LastNJob {
>      >
>      >    final val QUERY =
>      >      """
>      >        |WITH last_n AS (
>      >        |    SELECT accountId, `timestamp`
>      >        |    FROM (
>      >        |        SELECT *,
>      >        |            ROW_NUMBER() OVER (PARTITION BY accountId
>     ORDER BY
>      > `timestamp` DESC) AS row_num
>      >        |        FROM transactions
>      >        |    )
>      >        |    WHERE row_num <= 2
>      >        |)
>      >        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
>      > last2_timestamp
>      >        |FROM last_n
>      >        |GROUP BY accountId
>      >        |""".stripMargin
>      >
>      >    def main(args: Array[String]): Unit = {
>      >      val settings: EnvironmentSettings =
>      >
>     EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>      >      val streamEnv: StreamExecutionEnvironment =
>      > StreamExecutionEnvironment.getExecutionEnvironment
>      >      val tableEnv: StreamTableEnvironment =
>      > StreamTableEnvironment.create(streamEnv, settings)
>      >
>      >      val txnStream: DataStream[Transaction] = streamEnv
>      >        .addSource(new TransactionSource)
>      >        .name("transactions")
>      >
>      >      tableEnv.createTemporaryView("transactions", txnStream)
>      >
>      >      tableEnv.executeSql(QUERY).print()
>      >    }
>      > }
>      > ```
>      >
>      > When I run the program, I get:
>      >
>      > ```
>      > +----+----------------------+--------------------------------+
>      > | op |            accountId |                last2_timestamp |
>      > +----+----------------------+--------------------------------+
>      > | +I |                    1 |                  1546272000000 |
>      > | +I |                    2 |                  1546272360000 |
>      > | +I |                    3 |                  1546272720000 |
>      > | +I |                    4 |                  1546273080000 |
>      > | +I |                    5 |                  1546273440000 |
>      > | -U |                    1 |                  1546272000000 |
>      > | +U |                    1 |    1546272000000,1546273800000 |
>      > | -U |                    2 |                  1546272360000 |
>      > | +U |                    2 |    1546272360000,1546274160000 |
>      > | -U |                    3 |                  1546272720000 |
>      > | +U |                    3 |    1546272720000,1546274520000 |
>      > | -U |                    4 |                  1546273080000 |
>      > | +U |                    4 |    1546273080000,1546274880000 |
>      > | -U |                    5 |                  1546273440000 |
>      > | +U |                    5 |    1546273440000,1546275240000 |
>      > | -U |                    1 |    1546272000000,1546273800000 |
>      > | +U |                    1 |                  1546273800000 |
>      > | -U |                    1 |                  1546273800000 |
>      > | +U |                    1 |    1546273800000,1546275600000 |
>      > (to continue)
>      > ```
>      >
>      > Let's focus on the last transaction (from above) of accountId=1.
>     When
>      > there is a new transaction from account 1 that happens at
>      > timestamp=1546275600000, there are 4 operations in total.
>      >
>      > ```
>      > +----+----------------------+--------------------------------+
>      > | op |            accountId |                last2_timestamp |
>      > +----+----------------------+--------------------------------+
>      > | -U |                    1 |    1546272000000,1546273800000 |
>      > | +U |                    1 |                  1546273800000 |
>      > | -U |                    1 |                  1546273800000 |
>      > | +U |                    1 |    1546273800000,1546275600000 |
>      > ```
>      >
>      > While I only want to emit the below "new status" to my downstream
>     (let's
>      > say another Kafka topic) via some sort of merging:
>      >
>      > ```
>      > +----------------------+--------------------------------+
>      > |            accountId |                last2_timestamp |
>      > +----------------------+--------------------------------+
>      > |                    1 |    1546273800000,1546275600000 |
>      > ```
>      >
>      > So that my downstream is able to consume literally "the last 2
>      > transaction timestamps of each account":
>      > ```
>      > +----------------------+--------------------------------+
>      > |            accountId |                last2_timestamp |
>      > +----------------------+--------------------------------+
>      > |                    1 |                  1546272000000 |
>      > |                    1 |    1546272000000,1546273800000 |
>      > |                    1 |    1546273800000,1546275600000 |
>      > (to continue)
>      > ```
>      >
>      > What is the right way to do this?
>

Reply | Threaded
Open this post in threaded view
|

Re: How to emit after a merge?

Timo Walther
I don't know how the resulting plan for you query looks like. You can
print it via `env.sqlQuery().explain()`. But I could imagine that by
simplifying the query you would also simplify the number of retraction
messages/operators in the pipeline.

Regards,
Timo


On 05.03.21 13:28, Yik San Chan wrote:

> Hi Timo,
>
> If I understand correctly, the UDF only simplifies the query, but not
> doing anything functionally different. Please correct me if I am wrong,
> thank you!
>
> Best,
> Yik San
>
> On Thu, Mar 4, 2021 at 8:34 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Yes, implementing a UDF might be the most convenient option for some
>     use
>     cases. The accumulator of such a UDF could take the two timestamps and
>     perform the two aggregations at once.
>
>     The upsert-kafka connector can apply the updates to the Kafka log. If
>     you enable log compaction in Kafka, Kafka will clean up the log and
>     make
>     sure to only keep the most recent one.
>
>     Regards,
>     Timo
>
>     On 04.03.21 11:59, Yik San Chan wrote:
>      > Hi Timo,
>      >
>      > Thanks for the reply!
>      >
>      >  > You could filter the deletions manually in DataStream API
>     before writing
>      > them to Kafka.
>      >
>      > Yah I agree this helps the issue, though I will need to mix up
>     SQL and
>      > DataStream API.
>      >
>      >  > To simplify the query you could also investigate to implement
>     your own
>      > aggregate function and combine the Top 2 and ListAgg into one
>     operation.
>      >
>      > Do you mean implement an UDF to do so?
>      >
>      > Besides, is 'upsert-kafka' connector designed for this use case?
>      >
>      > Thank you.
>      >
>      > On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Hi Yik,
>      >
>      >     if I understand you correctly you would like to avoid the
>     deletions in
>      >     your stream?
>      >
>      >     You could filter the deletions manually in DataStream API before
>      >     writing
>      >     them to Kafka. Semantically the deletions are required to
>     produce a
>      >     correct result because the runtime is not aware of a key for
>     idempotent
>      >     updates.
>      >
>      >     To simplify the query you could also investigate to implement
>     your own
>      >     aggregate function and combine the Top 2 and ListAgg into one
>     operation.
>      >
>      >     Regards,
>      >     Timo
>      >
>      >     On 28.02.21 09:55, Yik San Chan wrote:
>      >      > I define a `Transaction` class:
>      >      >
>      >      > ```scala
>      >      > case class Transaction(accountId: Long, amount: Long,
>     timestamp:
>      >     Long)
>      >      > ```
>      >      >
>      >      > The `TransactionSource` simply emits `Transaction` with
>     some time
>      >      > interval. Now I want to compute the last 2 transaction
>     timestamp
>      >     of each
>      >      > account id, see code below:
>      >      >
>      >      > ```scala
>      >      > import org.apache.flink.streaming.api.scala.{DataStream,
>      >      > StreamExecutionEnvironment, _}
>      >      > import org.apache.flink.table.api.EnvironmentSettings
>      >      > import
>     org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>      >      > import org.apache.flink.walkthrough.common.entity.Transaction
>      >      > import
>     org.apache.flink.walkthrough.common.source.TransactionSource
>      >      >
>      >      > object LastNJob {
>      >      >
>      >      >    final val QUERY =
>      >      >      """
>      >      >        |WITH last_n AS (
>      >      >        |    SELECT accountId, `timestamp`
>      >      >        |    FROM (
>      >      >        |        SELECT *,
>      >      >        |            ROW_NUMBER() OVER (PARTITION BY accountId
>      >     ORDER BY
>      >      > `timestamp` DESC) AS row_num
>      >      >        |        FROM transactions
>      >      >        |    )
>      >      >        |    WHERE row_num <= 2
>      >      >        |)
>      >      >        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
>      >      > last2_timestamp
>      >      >        |FROM last_n
>      >      >        |GROUP BY accountId
>      >      >        |""".stripMargin
>      >      >
>      >      >    def main(args: Array[String]): Unit = {
>      >      >      val settings: EnvironmentSettings =
>      >      >
>      >  
>       EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>      >      >      val streamEnv: StreamExecutionEnvironment =
>      >      > StreamExecutionEnvironment.getExecutionEnvironment
>      >      >      val tableEnv: StreamTableEnvironment =
>      >      > StreamTableEnvironment.create(streamEnv, settings)
>      >      >
>      >      >      val txnStream: DataStream[Transaction] = streamEnv
>      >      >        .addSource(new TransactionSource)
>      >      >        .name("transactions")
>      >      >
>      >      >      tableEnv.createTemporaryView("transactions", txnStream)
>      >      >
>      >      >      tableEnv.executeSql(QUERY).print()
>      >      >    }
>      >      > }
>      >      > ```
>      >      >
>      >      > When I run the program, I get:
>      >      >
>      >      > ```
>      >      > +----+----------------------+--------------------------------+
>      >      > | op |            accountId |                last2_timestamp |
>      >      > +----+----------------------+--------------------------------+
>      >      > | +I |                    1 |                  1546272000000 |
>      >      > | +I |                    2 |                  1546272360000 |
>      >      > | +I |                    3 |                  1546272720000 |
>      >      > | +I |                    4 |                  1546273080000 |
>      >      > | +I |                    5 |                  1546273440000 |
>      >      > | -U |                    1 |                  1546272000000 |
>      >      > | +U |                    1 |    1546272000000,1546273800000 |
>      >      > | -U |                    2 |                  1546272360000 |
>      >      > | +U |                    2 |    1546272360000,1546274160000 |
>      >      > | -U |                    3 |                  1546272720000 |
>      >      > | +U |                    3 |    1546272720000,1546274520000 |
>      >      > | -U |                    4 |                  1546273080000 |
>      >      > | +U |                    4 |    1546273080000,1546274880000 |
>      >      > | -U |                    5 |                  1546273440000 |
>      >      > | +U |                    5 |    1546273440000,1546275240000 |
>      >      > | -U |                    1 |    1546272000000,1546273800000 |
>      >      > | +U |                    1 |                  1546273800000 |
>      >      > | -U |                    1 |                  1546273800000 |
>      >      > | +U |                    1 |    1546273800000,1546275600000 |
>      >      > (to continue)
>      >      > ```
>      >      >
>      >      > Let's focus on the last transaction (from above) of
>     accountId=1.
>      >     When
>      >      > there is a new transaction from account 1 that happens at
>      >      > timestamp=1546275600000, there are 4 operations in total.
>      >      >
>      >      > ```
>      >      > +----+----------------------+--------------------------------+
>      >      > | op |            accountId |                last2_timestamp |
>      >      > +----+----------------------+--------------------------------+
>      >      > | -U |                    1 |    1546272000000,1546273800000 |
>      >      > | +U |                    1 |                  1546273800000 |
>      >      > | -U |                    1 |                  1546273800000 |
>      >      > | +U |                    1 |    1546273800000,1546275600000 |
>      >      > ```
>      >      >
>      >      > While I only want to emit the below "new status" to my
>     downstream
>      >     (let's
>      >      > say another Kafka topic) via some sort of merging:
>      >      >
>      >      > ```
>      >      > +----------------------+--------------------------------+
>      >      > |            accountId |                last2_timestamp |
>      >      > +----------------------+--------------------------------+
>      >      > |                    1 |    1546273800000,1546275600000 |
>      >      > ```
>      >      >
>      >      > So that my downstream is able to consume literally "the last 2
>      >      > transaction timestamps of each account":
>      >      > ```
>      >      > +----------------------+--------------------------------+
>      >      > |            accountId |                last2_timestamp |
>      >      > +----------------------+--------------------------------+
>      >      > |                    1 |                  1546272000000 |
>      >      > |                    1 |    1546272000000,1546273800000 |
>      >      > |                    1 |    1546273800000,1546275600000 |
>      >      > (to continue)
>      >      > ```
>      >      >
>      >      > What is the right way to do this?
>      >
>