GroupBy with count on a joint table only let met write using toRetractStream

Posted by Faye Pressly on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/GroupBy-with-count-on-a-joint-table-only-let-met-write-using-toRetractStream-tp37269.html

Hello,

I have a steam of events (coming from a Kinesis Stream) of this form:

impressionId | advertid | variationName | eventType | eventTime

The end goal is to output back on a Kinesis Stream the count of event of type 'impression' and the count of events of type 'click'

however, I need to drop (or ignore) event of type clicks that don't have a matching impressionId with an event of type 'impression' (So basically I need to discard click event that don't have an impression) 

This is how tackled my solution:

// Convert the stream into a table
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId, variationName, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

// Create a table with only event of type clicks
Table clicksTable = eventsTable
      .where("eventType = 'click'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as clickImpId, creativeId as clickAdvertId, variationName as clickVariationName, minuteWindow.rowTime as clickMinute");

// Create a table with only event of type impression
Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as impImpressionId, advertId as impAdvertId, variationName as impVariationName, eventTime, minuteWindow.rowTime as impMinute");

// left join the impression with the clicks using the impressionId as well as the temporal field
//and then group by to generate a count of all the click that have a matching impression (aka row where clickAdvertId is not null)
Table allImpressionTable = impressionsTable
      .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute")
      .groupBy("impAdvertId, impVariationName, impMinute")
      .select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute")
       .where("clickCount != null");
[.... same logic to count impressions]

Now to debug and to see if the counts are correct I usually use "tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm able to use that new created stream to send it back on a kinesis Stream

However I have an error saying that I cannot use toAppendStream and that instead I have to use toRetractStream. It indeed works and I can see the counts in the output are correct however I don't understand how I can use the result contained in this new stream because it has multiple rows with "true"/"false" and the correct count is usuall the last entry with the "true" key.

I have multiple question:

1) I'm very new with Flink and I would like to know if my approach to filter-out un-matching events is the correct one ? (stream -> table and joins -> stream)
Is there a much easier way of doing this ? Is it perhaps possible to filter all these events directly in the DataStream?


2) How do I use the retractStream? How do use it in order to send the final counts to a sink and not the entirety of the "true/False" insert/Delete rows?


Thank you!