How to emit after a merge?
Posted by
Yik San Chan on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-emit-after-a-merge-tp41784.html
I define a `Transaction` class:
```scala
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
```
The `TransactionSource` simply emits `Transaction` with some time interval. Now I want to compute the last 2 transaction timestamp of each account id, see code below:
```scala
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource
object LastNJob {
final val QUERY =
"""
|WITH last_n AS (
| SELECT accountId, `timestamp`
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
| FROM transactions
| )
| WHERE row_num <= 2
|)
|SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
|FROM last_n
|GROUP BY accountId
|""".stripMargin
def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)
val txnStream: DataStream[Transaction] = streamEnv
.addSource(new TransactionSource)
.name("transactions")
tableEnv.createTemporaryView("transactions", txnStream)
tableEnv.executeSql(QUERY).print()
}
}
```
When I run the program, I get:
```
+----+----------------------+--------------------------------+
| op | accountId | last2_timestamp |
+----+----------------------+--------------------------------+
| +I | 1 | 1546272000000 |
| +I | 2 | 1546272360000 |
| +I | 3 | 1546272720000 |
| +I | 4 | 1546273080000 |
| +I | 5 | 1546273440000 |
| -U | 1 | 1546272000000 |
| +U | 1 | 1546272000000,1546273800000 |
| -U | 2 | 1546272360000 |
| +U | 2 | 1546272360000,1546274160000 |
| -U | 3 | 1546272720000 |
| +U | 3 | 1546272720000,1546274520000 |
| -U | 4 | 1546273080000 |
| +U | 4 | 1546273080000,1546274880000 |
| -U | 5 | 1546273440000 |
| +U | 5 | 1546273440000,1546275240000 |
| -U | 1 | 1546272000000,1546273800000 |
| +U | 1 | 1546273800000 |
| -U | 1 | 1546273800000 |
| +U | 1 | 1546273800000,1546275600000 |
(to continue)
```
Let's focus on the last transaction (from above) of accountId=1. When there is a new transaction from account 1 that happens at timestamp=1546275600000, there are 4 operations in total.
```
+----+----------------------+--------------------------------+
| op | accountId | last2_timestamp |
+----+----------------------+--------------------------------+
| -U | 1 | 1546272000000,1546273800000 |
| +U | 1 | 1546273800000 |
| -U | 1 | 1546273800000 |
| +U | 1 | 1546273800000,1546275600000 |
```
While I only want to emit the below "new status" to my downstream (let's say another Kafka topic) via some sort of merging:
```
+----------------------+--------------------------------+
| accountId | last2_timestamp |
+----------------------+--------------------------------+
| 1 | 1546273800000,1546275600000 |
```
So that my downstream is able to consume literally "the last 2 transaction timestamps of each account":
```
+----------------------+--------------------------------+
| accountId | last2_timestamp |
+----------------------+--------------------------------+
| 1 | 1546272000000 |
| 1 | 1546272000000,1546273800000 |
| 1 | 1546273800000,1546275600000 |
(to continue)
```
What is the right way to do this?