Re: How to emit after a merge?

Posted by Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-emit-after-a-merge-tp41784p41983.html

I don't know how the resulting plan for you query looks like. You can
print it via `env.sqlQuery().explain()`. But I could imagine that by
simplifying the query you would also simplify the number of retraction
messages/operators in the pipeline.

Regards,
Timo


On 05.03.21 13:28, Yik San Chan wrote:

> Hi Timo,
>
> If I understand correctly, the UDF only simplifies the query, but not
> doing anything functionally different. Please correct me if I am wrong,
> thank you!
>
> Best,
> Yik San
>
> On Thu, Mar 4, 2021 at 8:34 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Yes, implementing a UDF might be the most convenient option for some
>     use
>     cases. The accumulator of such a UDF could take the two timestamps and
>     perform the two aggregations at once.
>
>     The upsert-kafka connector can apply the updates to the Kafka log. If
>     you enable log compaction in Kafka, Kafka will clean up the log and
>     make
>     sure to only keep the most recent one.
>
>     Regards,
>     Timo
>
>     On 04.03.21 11:59, Yik San Chan wrote:
>      > Hi Timo,
>      >
>      > Thanks for the reply!
>      >
>      >  > You could filter the deletions manually in DataStream API
>     before writing
>      > them to Kafka.
>      >
>      > Yah I agree this helps the issue, though I will need to mix up
>     SQL and
>      > DataStream API.
>      >
>      >  > To simplify the query you could also investigate to implement
>     your own
>      > aggregate function and combine the Top 2 and ListAgg into one
>     operation.
>      >
>      > Do you mean implement an UDF to do so?
>      >
>      > Besides, is 'upsert-kafka' connector designed for this use case?
>      >
>      > Thank you.
>      >
>      > On Thu, Mar 4, 2021 at 4:41 PM Timo Walther <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Hi Yik,
>      >
>      >     if I understand you correctly you would like to avoid the
>     deletions in
>      >     your stream?
>      >
>      >     You could filter the deletions manually in DataStream API before
>      >     writing
>      >     them to Kafka. Semantically the deletions are required to
>     produce a
>      >     correct result because the runtime is not aware of a key for
>     idempotent
>      >     updates.
>      >
>      >     To simplify the query you could also investigate to implement
>     your own
>      >     aggregate function and combine the Top 2 and ListAgg into one
>     operation.
>      >
>      >     Regards,
>      >     Timo
>      >
>      >     On 28.02.21 09:55, Yik San Chan wrote:
>      >      > I define a `Transaction` class:
>      >      >
>      >      > ```scala
>      >      > case class Transaction(accountId: Long, amount: Long,
>     timestamp:
>      >     Long)
>      >      > ```
>      >      >
>      >      > The `TransactionSource` simply emits `Transaction` with
>     some time
>      >      > interval. Now I want to compute the last 2 transaction
>     timestamp
>      >     of each
>      >      > account id, see code below:
>      >      >
>      >      > ```scala
>      >      > import org.apache.flink.streaming.api.scala.{DataStream,
>      >      > StreamExecutionEnvironment, _}
>      >      > import org.apache.flink.table.api.EnvironmentSettings
>      >      > import
>     org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>      >      > import org.apache.flink.walkthrough.common.entity.Transaction
>      >      > import
>     org.apache.flink.walkthrough.common.source.TransactionSource
>      >      >
>      >      > object LastNJob {
>      >      >
>      >      >    final val QUERY =
>      >      >      """
>      >      >        |WITH last_n AS (
>      >      >        |    SELECT accountId, `timestamp`
>      >      >        |    FROM (
>      >      >        |        SELECT *,
>      >      >        |            ROW_NUMBER() OVER (PARTITION BY accountId
>      >     ORDER BY
>      >      > `timestamp` DESC) AS row_num
>      >      >        |        FROM transactions
>      >      >        |    )
>      >      >        |    WHERE row_num <= 2
>      >      >        |)
>      >      >        |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
>      >      > last2_timestamp
>      >      >        |FROM last_n
>      >      >        |GROUP BY accountId
>      >      >        |""".stripMargin
>      >      >
>      >      >    def main(args: Array[String]): Unit = {
>      >      >      val settings: EnvironmentSettings =
>      >      >
>      >  
>       EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>      >      >      val streamEnv: StreamExecutionEnvironment =
>      >      > StreamExecutionEnvironment.getExecutionEnvironment
>      >      >      val tableEnv: StreamTableEnvironment =
>      >      > StreamTableEnvironment.create(streamEnv, settings)
>      >      >
>      >      >      val txnStream: DataStream[Transaction] = streamEnv
>      >      >        .addSource(new TransactionSource)
>      >      >        .name("transactions")
>      >      >
>      >      >      tableEnv.createTemporaryView("transactions", txnStream)
>      >      >
>      >      >      tableEnv.executeSql(QUERY).print()
>      >      >    }
>      >      > }
>      >      > ```
>      >      >
>      >      > When I run the program, I get:
>      >      >
>      >      > ```
>      >      > +----+----------------------+--------------------------------+
>      >      > | op |            accountId |                last2_timestamp |
>      >      > +----+----------------------+--------------------------------+
>      >      > | +I |                    1 |                  1546272000000 |
>      >      > | +I |                    2 |                  1546272360000 |
>      >      > | +I |                    3 |                  1546272720000 |
>      >      > | +I |                    4 |                  1546273080000 |
>      >      > | +I |                    5 |                  1546273440000 |
>      >      > | -U |                    1 |                  1546272000000 |
>      >      > | +U |                    1 |    1546272000000,1546273800000 |
>      >      > | -U |                    2 |                  1546272360000 |
>      >      > | +U |                    2 |    1546272360000,1546274160000 |
>      >      > | -U |                    3 |                  1546272720000 |
>      >      > | +U |                    3 |    1546272720000,1546274520000 |
>      >      > | -U |                    4 |                  1546273080000 |
>      >      > | +U |                    4 |    1546273080000,1546274880000 |
>      >      > | -U |                    5 |                  1546273440000 |
>      >      > | +U |                    5 |    1546273440000,1546275240000 |
>      >      > | -U |                    1 |    1546272000000,1546273800000 |
>      >      > | +U |                    1 |                  1546273800000 |
>      >      > | -U |                    1 |                  1546273800000 |
>      >      > | +U |                    1 |    1546273800000,1546275600000 |
>      >      > (to continue)
>      >      > ```
>      >      >
>      >      > Let's focus on the last transaction (from above) of
>     accountId=1.
>      >     When
>      >      > there is a new transaction from account 1 that happens at
>      >      > timestamp=1546275600000, there are 4 operations in total.
>      >      >
>      >      > ```
>      >      > +----+----------------------+--------------------------------+
>      >      > | op |            accountId |                last2_timestamp |
>      >      > +----+----------------------+--------------------------------+
>      >      > | -U |                    1 |    1546272000000,1546273800000 |
>      >      > | +U |                    1 |                  1546273800000 |
>      >      > | -U |                    1 |                  1546273800000 |
>      >      > | +U |                    1 |    1546273800000,1546275600000 |
>      >      > ```
>      >      >
>      >      > While I only want to emit the below "new status" to my
>     downstream
>      >     (let's
>      >      > say another Kafka topic) via some sort of merging:
>      >      >
>      >      > ```
>      >      > +----------------------+--------------------------------+
>      >      > |            accountId |                last2_timestamp |
>      >      > +----------------------+--------------------------------+
>      >      > |                    1 |    1546273800000,1546275600000 |
>      >      > ```
>      >      >
>      >      > So that my downstream is able to consume literally "the last 2
>      >      > transaction timestamps of each account":
>      >      > ```
>      >      > +----------------------+--------------------------------+
>      >      > |            accountId |                last2_timestamp |
>      >      > +----------------------+--------------------------------+
>      >      > |                    1 |                  1546272000000 |
>      >      > |                    1 |    1546272000000,1546273800000 |
>      >      > |                    1 |    1546273800000,1546275600000 |
>      >      > (to continue)
>      >      > ```
>      >      >
>      >      > What is the right way to do this?
>      >
>