http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-emit-after-a-merge-tp41784p41918.html
Yes, implementing a UDF might be the most convenient option for some use
cases. The accumulator of such a UDF could take the two timestamps and
The upsert-kafka connector can apply the updates to the Kafka log. If
> Hi Timo,
>
> Thanks for the reply!
>
> > You could filter the deletions manually in DataStream API before writing
> them to Kafka.
>
> Yah I agree this helps the issue, though I will need to mix up SQL and
> DataStream API.
>
> > To simplify the query you could also investigate to implement your own
> aggregate function and combine the Top 2 and ListAgg into one operation.
>
> Do you mean implement an UDF to do so?
>
> Besides, is 'upsert-kafka' connector designed for this use case?
>
> Thank you.
>
> On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Yik,
>
> if I understand you correctly you would like to avoid the deletions in
> your stream?
>
> You could filter the deletions manually in DataStream API before
> writing
> them to Kafka. Semantically the deletions are required to produce a
> correct result because the runtime is not aware of a key for idempotent
> updates.
>
> To simplify the query you could also investigate to implement your own
> aggregate function and combine the Top 2 and ListAgg into one operation.
>
> Regards,
> Timo
>
> On 28.02.21 09:55, Yik San Chan wrote:
> > I define a `Transaction` class:
> >
> > ```scala
> > case class Transaction(accountId: Long, amount: Long, timestamp:
> Long)
> > ```
> >
> > The `TransactionSource` simply emits `Transaction` with some time
> > interval. Now I want to compute the last 2 transaction timestamp
> of each
> > account id, see code below:
> >
> > ```scala
> > import org.apache.flink.streaming.api.scala.{DataStream,
> > StreamExecutionEnvironment, _}
> > import org.apache.flink.table.api.EnvironmentSettings
> > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> > import org.apache.flink.walkthrough.common.entity.Transaction
> > import org.apache.flink.walkthrough.common.source.TransactionSource
> >
> > object LastNJob {
> >
> > final val QUERY =
> > """
> > |WITH last_n AS (
> > | SELECT accountId, `timestamp`
> > | FROM (
> > | SELECT *,
> > | ROW_NUMBER() OVER (PARTITION BY accountId
> ORDER BY
> > `timestamp` DESC) AS row_num
> > | FROM transactions
> > | )
> > | WHERE row_num <= 2
> > |)
> > |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
> > last2_timestamp
> > |FROM last_n
> > |GROUP BY accountId
> > |""".stripMargin
> >
> > def main(args: Array[String]): Unit = {
> > val settings: EnvironmentSettings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> > val streamEnv: StreamExecutionEnvironment =
> > StreamExecutionEnvironment.getExecutionEnvironment
> > val tableEnv: StreamTableEnvironment =
> > StreamTableEnvironment.create(streamEnv, settings)
> >
> > val txnStream: DataStream[Transaction] = streamEnv
> > .addSource(new TransactionSource)
> > .name("transactions")
> >
> > tableEnv.createTemporaryView("transactions", txnStream)
> >
> > tableEnv.executeSql(QUERY).print()
> > }
> > }
> > ```
> >
> > When I run the program, I get:
> >
> > ```
> > +----+----------------------+--------------------------------+
> > | op | accountId | last2_timestamp |
> > +----+----------------------+--------------------------------+
> > | +I | 1 | 1546272000000 |
> > | +I | 2 | 1546272360000 |
> > | +I | 3 | 1546272720000 |
> > | +I | 4 | 1546273080000 |
> > | +I | 5 | 1546273440000 |
> > | -U | 1 | 1546272000000 |
> > | +U | 1 | 1546272000000,1546273800000 |
> > | -U | 2 | 1546272360000 |
> > | +U | 2 | 1546272360000,1546274160000 |
> > | -U | 3 | 1546272720000 |
> > | +U | 3 | 1546272720000,1546274520000 |
> > | -U | 4 | 1546273080000 |
> > | +U | 4 | 1546273080000,1546274880000 |
> > | -U | 5 | 1546273440000 |
> > | +U | 5 | 1546273440000,1546275240000 |
> > | -U | 1 | 1546272000000,1546273800000 |
> > | +U | 1 | 1546273800000 |
> > | -U | 1 | 1546273800000 |
> > | +U | 1 | 1546273800000,1546275600000 |
> > (to continue)
> > ```
> >
> > Let's focus on the last transaction (from above) of accountId=1.
> When
> > there is a new transaction from account 1 that happens at
> > timestamp=1546275600000, there are 4 operations in total.
> >
> > ```
> > +----+----------------------+--------------------------------+
> > | op | accountId | last2_timestamp |
> > +----+----------------------+--------------------------------+
> > | -U | 1 | 1546272000000,1546273800000 |
> > | +U | 1 | 1546273800000 |
> > | -U | 1 | 1546273800000 |
> > | +U | 1 | 1546273800000,1546275600000 |
> > ```
> >
> > While I only want to emit the below "new status" to my downstream
> (let's
> > say another Kafka topic) via some sort of merging:
> >
> > ```
> > +----------------------+--------------------------------+
> > | accountId | last2_timestamp |
> > +----------------------+--------------------------------+
> > | 1 | 1546273800000,1546275600000 |
> > ```
> >
> > So that my downstream is able to consume literally "the last 2
> > transaction timestamps of each account":
> > ```
> > +----------------------+--------------------------------+
> > | accountId | last2_timestamp |
> > +----------------------+--------------------------------+
> > | 1 | 1546272000000 |
> > | 1 | 1546272000000,1546273800000 |
> > | 1 | 1546273800000,1546275600000 |
> > (to continue)
> > ```
> >
> > What is the right way to do this?
>