http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-emit-after-a-merge-tp41784p41983.html
I don't know how the resulting plan for you query looks like. You can
print it via `env.sqlQuery().explain()`. But I could imagine that by
> Hi Timo,
>
> If I understand correctly, the UDF only simplifies the query, but not
> doing anything functionally different. Please correct me if I am wrong,
> thank you!
>
> Best,
> Yik San
>
> On Thu, Mar 4, 2021 at 8:34 PM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Yes, implementing a UDF might be the most convenient option for some
> use
> cases. The accumulator of such a UDF could take the two timestamps and
> perform the two aggregations at once.
>
> The upsert-kafka connector can apply the updates to the Kafka log. If
> you enable log compaction in Kafka, Kafka will clean up the log and
> make
> sure to only keep the most recent one.
>
> Regards,
> Timo
>
> On 04.03.21 11:59, Yik San Chan wrote:
> > Hi Timo,
> >
> > Thanks for the reply!
> >
> > > You could filter the deletions manually in DataStream API
> before writing
> > them to Kafka.
> >
> > Yah I agree this helps the issue, though I will need to mix up
> SQL and
> > DataStream API.
> >
> > > To simplify the query you could also investigate to implement
> your own
> > aggregate function and combine the Top 2 and ListAgg into one
> operation.
> >
> > Do you mean implement an UDF to do so?
> >
> > Besides, is 'upsert-kafka' connector designed for this use case?
> >
> > Thank you.
> >
> > On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
> >
> > Hi Yik,
> >
> > if I understand you correctly you would like to avoid the
> deletions in
> > your stream?
> >
> > You could filter the deletions manually in DataStream API before
> > writing
> > them to Kafka. Semantically the deletions are required to
> produce a
> > correct result because the runtime is not aware of a key for
> idempotent
> > updates.
> >
> > To simplify the query you could also investigate to implement
> your own
> > aggregate function and combine the Top 2 and ListAgg into one
> operation.
> >
> > Regards,
> > Timo
> >
> > On 28.02.21 09:55, Yik San Chan wrote:
> > > I define a `Transaction` class:
> > >
> > > ```scala
> > > case class Transaction(accountId: Long, amount: Long,
> timestamp:
> > Long)
> > > ```
> > >
> > > The `TransactionSource` simply emits `Transaction` with
> some time
> > > interval. Now I want to compute the last 2 transaction
> timestamp
> > of each
> > > account id, see code below:
> > >
> > > ```scala
> > > import org.apache.flink.streaming.api.scala.{DataStream,
> > > StreamExecutionEnvironment, _}
> > > import org.apache.flink.table.api.EnvironmentSettings
> > > import
> org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> > > import org.apache.flink.walkthrough.common.entity.Transaction
> > > import
> org.apache.flink.walkthrough.common.source.TransactionSource
> > >
> > > object LastNJob {
> > >
> > > final val QUERY =
> > > """
> > > |WITH last_n AS (
> > > | SELECT accountId, `timestamp`
> > > | FROM (
> > > | SELECT *,
> > > | ROW_NUMBER() OVER (PARTITION BY accountId
> > ORDER BY
> > > `timestamp` DESC) AS row_num
> > > | FROM transactions
> > > | )
> > > | WHERE row_num <= 2
> > > |)
> > > |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
> > > last2_timestamp
> > > |FROM last_n
> > > |GROUP BY accountId
> > > |""".stripMargin
> > >
> > > def main(args: Array[String]): Unit = {
> > > val settings: EnvironmentSettings =
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> > > val streamEnv: StreamExecutionEnvironment =
> > > StreamExecutionEnvironment.getExecutionEnvironment
> > > val tableEnv: StreamTableEnvironment =
> > > StreamTableEnvironment.create(streamEnv, settings)
> > >
> > > val txnStream: DataStream[Transaction] = streamEnv
> > > .addSource(new TransactionSource)
> > > .name("transactions")
> > >
> > > tableEnv.createTemporaryView("transactions", txnStream)
> > >
> > > tableEnv.executeSql(QUERY).print()
> > > }
> > > }
> > > ```
> > >
> > > When I run the program, I get:
> > >
> > > ```
> > > +----+----------------------+--------------------------------+
> > > | op | accountId | last2_timestamp |
> > > +----+----------------------+--------------------------------+
> > > | +I | 1 | 1546272000000 |
> > > | +I | 2 | 1546272360000 |
> > > | +I | 3 | 1546272720000 |
> > > | +I | 4 | 1546273080000 |
> > > | +I | 5 | 1546273440000 |
> > > | -U | 1 | 1546272000000 |
> > > | +U | 1 | 1546272000000,1546273800000 |
> > > | -U | 2 | 1546272360000 |
> > > | +U | 2 | 1546272360000,1546274160000 |
> > > | -U | 3 | 1546272720000 |
> > > | +U | 3 | 1546272720000,1546274520000 |
> > > | -U | 4 | 1546273080000 |
> > > | +U | 4 | 1546273080000,1546274880000 |
> > > | -U | 5 | 1546273440000 |
> > > | +U | 5 | 1546273440000,1546275240000 |
> > > | -U | 1 | 1546272000000,1546273800000 |
> > > | +U | 1 | 1546273800000 |
> > > | -U | 1 | 1546273800000 |
> > > | +U | 1 | 1546273800000,1546275600000 |
> > > (to continue)
> > > ```
> > >
> > > Let's focus on the last transaction (from above) of
> accountId=1.
> > When
> > > there is a new transaction from account 1 that happens at
> > > timestamp=1546275600000, there are 4 operations in total.
> > >
> > > ```
> > > +----+----------------------+--------------------------------+
> > > | op | accountId | last2_timestamp |
> > > +----+----------------------+--------------------------------+
> > > | -U | 1 | 1546272000000,1546273800000 |
> > > | +U | 1 | 1546273800000 |
> > > | -U | 1 | 1546273800000 |
> > > | +U | 1 | 1546273800000,1546275600000 |
> > > ```
> > >
> > > While I only want to emit the below "new status" to my
> downstream
> > (let's
> > > say another Kafka topic) via some sort of merging:
> > >
> > > ```
> > > +----------------------+--------------------------------+
> > > | accountId | last2_timestamp |
> > > +----------------------+--------------------------------+
> > > | 1 | 1546273800000,1546275600000 |
> > > ```
> > >
> > > So that my downstream is able to consume literally "the last 2
> > > transaction timestamps of each account":
> > > ```
> > > +----------------------+--------------------------------+
> > > | accountId | last2_timestamp |
> > > +----------------------+--------------------------------+
> > > | 1 | 1546272000000 |
> > > | 1 | 1546272000000,1546273800000 |
> > > | 1 | 1546273800000,1546275600000 |
> > > (to continue)
> > > ```
> > >
> > > What is the right way to do this?
> >
>