Hi, I’m having a strange situation and I would like to know where I should start trying to debug.
I have set up a configurable swap in source, with three implementations:
1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation
From injecting a log and no-op map function I can see that all three sources pass through the events correctly.
I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key
Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:
- When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
- When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different behaviour:
Properties sourceProperties = new Properties();
ConsumerFactory sourceFactory;
String sourceName = configParams.getRequired("source");
switch (sourceName.toLowerCase(Locale.ENGLISH)) {
case "kinesis":
sourceFactory = FlinkKinesisConsumer::new;
copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);
copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
break;
case "kafka":
sourceFactory = FlinkKafkaConsumer010::new;
copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");
copyOptionalArg(configParams, "group-id", sourceProperties, "
group.id");
break;
case "mock":
sourceFactory = MockSourceFunction::new;
break;
default:
throw new RuntimeException("Unknown source '" + sourceName + '\'');
}
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// poll watermark every second because using BoundedOutOfOrdernessTimestampExtractor
env.getConfig().setAutoWatermarkInterval(1000L);
env.enableCheckpointing(5000);
SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(
configParams.getRequired("topic"),
new ObjectNodeDeserializationSchema(),
sourceProperties
))
.returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back
.name("raw-events")
.assignTimestampsAndWatermarks(
new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
)
.split(new JsonNodeOutputSelector("eventType"));
...
eventsByType.select(...)
.keyBy(new JsonNodeStringKeySelector("_key"))
.window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
(KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster))
.trigger(EventTimeTrigger.create())
.aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data
.print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!