It is definitely a solution ;)
You should be aware of the downsides though:
Best,
Dawid
Yes, it was the "watermarks for event time when no events for that shard" problem.
I am now investigating whether we can use a blended watermark of max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure idle shards do not cause excessive data retention.
Is that the best solution?
On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz <[hidden email]> wrote:
Hi Stephen,
Watermark for a single operator is the minimum of Watermarks received from all inputs, therefore if one of your shards/operators does not have incoming data it will not produce Watermarks thus the Watermark of WindowOperator will not progress. So this is sort of an expected behavior.
I recommend reading the docs linked by Congxian, especially this section[1].
Best,
Dawid
On 19/02/2019 14:31, Stephen Connolly wrote:
Hmmm my suspicions are now quite high. I created a file source that just replays the events straight then I get more results....
On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <[hidden email]> wrote:
Hmmm after expanding the dataset such that there was additional data that ended up on shard-0 (everything in my original dataset was coincidentally landing on shard-1) I am now getting output... should I expect this kind of behaviour if no data arrives at shard-0 ever?
On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <[hidden email]> wrote:
Hi, I’m having a strange situation and I would like to know where I should start trying to debug.
I have set up a configurable swap in source, with three implementations:
1. A mock implementation2. A Kafka consumer implementation3. A Kinesis consumer implementation
From injecting a log and no-op map function I can see that all three sources pass through the events correctly.
I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key
Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:
- When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
- When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different behaviour:
Properties sourceProperties = new Properties();ConsumerFactory sourceFactory;String sourceName = configParams.getRequired("source");switch (sourceName.toLowerCase(Locale.ENGLISH)) {case "kinesis":sourceFactory = FlinkKinesisConsumer::new;copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);break;case "kafka":sourceFactory = FlinkKafkaConsumer010::new;copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");copyOptionalArg(configParams, "group-id", sourceProperties, "group.id");break;case "mock":sourceFactory = MockSourceFunction::new;break;default:throw new RuntimeException("Unknown source '" + sourceName + '\'');}
// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// poll watermark every second because using BoundedOutOfOrdernessTimestampExtractorenv.getConfig().setAutoWatermarkInterval(1000L);env.enableCheckpointing(5000);
SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(configParams.getRequired("topic"),new ObjectNodeDeserializationSchema(),sourceProperties)).returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back.name("raw-events").assignTimestampsAndWatermarks(new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))).split(new JsonNodeOutputSelector("eventType"));...eventsByType.select(...).keyBy(new JsonNodeStringKeySelector("_key")).window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),(KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster)).trigger(EventTimeTrigger.create()).aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data.print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!
Free forum by Nabble | Edit this page |