How to emit after a merge?

Posted by Yik San Chan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-emit-after-a-merge-tp41784.html

I define a `Transaction` class:

```scala
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
```

The `TransactionSource` simply emits `Transaction` with some time interval. Now I want to compute the last 2 transaction timestamp of each account id, see code below:

```scala
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

  final val QUERY =
    """
      |WITH last_n AS (
      |    SELECT accountId, `timestamp`
      |    FROM (
      |        SELECT *,
      |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
      |        FROM transactions
      |    )
      |    WHERE row_num <= 2
      |)
      |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
      |FROM last_n
      |GROUP BY accountId
      |""".stripMargin

  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)

    val txnStream: DataStream[Transaction] = streamEnv
      .addSource(new TransactionSource)
      .name("transactions")

    tableEnv.createTemporaryView("transactions", txnStream)

    tableEnv.executeSql(QUERY).print()
  }
}
```

When I run the program, I get:

```
+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| +I |                    1 |                  1546272000000 |
| +I |                    2 |                  1546272360000 |
| +I |                    3 |                  1546272720000 |
| +I |                    4 |                  1546273080000 |
| +I |                    5 |                  1546273440000 |
| -U |                    1 |                  1546272000000 |
| +U |                    1 |    1546272000000,1546273800000 |
| -U |                    2 |                  1546272360000 |
| +U |                    2 |    1546272360000,1546274160000 |
| -U |                    3 |                  1546272720000 |
| +U |                    3 |    1546272720000,1546274520000 |
| -U |                    4 |                  1546273080000 |
| +U |                    4 |    1546273080000,1546274880000 |
| -U |                    5 |                  1546273440000 |
| +U |                    5 |    1546273440000,1546275240000 |
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
(to continue)
```

Let's focus on the last transaction (from above) of accountId=1. When there is a new transaction from account 1 that happens at timestamp=1546275600000, there are 4 operations in total.

```
+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
```

While I only want to emit the below "new status" to my downstream (let's say another Kafka topic) via some sort of merging:

```
+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |    1546273800000,1546275600000 |
```

So that my downstream is able to consume literally "the last 2 transaction timestamps of each account":
```
+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |                  1546272000000 |
|                    1 |    1546272000000,1546273800000 |
|                    1 |    1546273800000,1546275600000 |
(to continue)
```

What is the right way to do this?