Sorry just notice I made a typo in the last table (
clickAdvertId != null instead of clickCount !=null)
Table allImpressionTable = impressionsTable
.leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute")
.groupBy("impAdvertId, impVariationName, impMinute")
.select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute")
.where("clickAdvertId
!= null");
Hello,
I have a steam of events (coming from a Kinesis Stream) of this form:
impressionId | advertid | variationName | eventType | eventTime
The end goal is to output back on a Kinesis Stream the count of event of type 'impression' and the count of events of type 'click'
however, I need to drop (or ignore) event of type clicks that don't have a matching
impressionId with an event of type 'impression' (So
basically I need to discard click event that don't have an impression)
This is how tackled my solution:
// Convert the stream into a table
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId, variationName, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
// Create a table with only event of type clicks
Table clicksTable =
eventsTable
.where("eventType = 'click'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId,
advertId, variationName, minuteWindow")
.select("impressionId as clickImpId, creativeId as clickAdvertId, variationName as clickVariationName, minuteWindow.rowTime as clickMinute");
// Create a table with only event of type impression
Table impressionsTable = eventsTable
.where("eventType = 'impression'")
.window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("impressionId, advertId, variationName, minuteWindow")
.select("impressionId as impImpressionId,
advertId as impAdvertId, variationName as impVariationName, eventTime, minuteWindow.rowTime as impMinute");
// left join the impression with the clicks using the impressionId as well as the temporal field
//and then group by to generate a count of all the click that have a matching impression (aka row where clickAdvertId is not null)
Table allImpressionTable = impressionsTable
.leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute")
.groupBy("impAdvertId, impVariationName, impMinute")
.select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute")
.where("clickCount
!= null");
[.... same logic to count impressions]
Now to debug and to see if the counts are correct I usually use "tEnv.toAppendStream(allImpressionTable,
Result.class).print()" and I'm able to use that new created stream to send it back on a kinesis Stream
However I have an error saying that I cannot use toAppendStream
and that instead I have to use toRetractStream. It
indeed works and I can see the counts in the output are correct however I don't understand how I can use the result contained in this new stream because it has multiple rows with "true"/"false" and the correct count is usuall the last entry with the "true"
key.
I have multiple question:
1) I'm very new with Flink and I would like to know if my approach to filter-out un-matching events is the correct one ? (stream -> table and joins -> stream)
Is there a much easier way of doing this ? Is it perhaps possible to filter all these events directly in the DataStream?
2) How do I use the retractStream? How do use it in order to send the final counts to a sink and not the entirety of the "true/False" insert/Delete rows?
Thank you!