Re: How to debug difference between Kinesis and Kafka

Posted by Congxian Qiu on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-debug-difference-between-Kinesis-and-Kafka-tp26225p26252.html

Hi Stephen

If the window has not been triggered ever, maybe you could investigate the watermark, maybe the doc[1][2] can be helpful.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks

Best, Congxian
On Feb 19, 2019, 21:31 +0800, Stephen Connolly <[hidden email]>, wrote:
Hmmm my suspicions are now quite high. I created a file source that just replays the events straight then I get more results....

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <[hidden email]> wrote:
Hmmm after expanding the dataset such that there was additional data that ended up on shard-0 (everything in my original dataset was coincidentally landing on shard-1) I am now getting output... should I expect this kind of behaviour if no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <[hidden email]> wrote:
Hi, I’m having a strange situation and I would like to know where I should start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

From injecting a log and no-op map function I can see that all three sources pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key

Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:

  • When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
  • When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing
I cannot see how this source switching could result in such a different behaviour:

        Properties sourceProperties = new Properties();
        ConsumerFactory sourceFactory;
        String sourceName = configParams.getRequired("source");
        switch (sourceName.toLowerCase(Locale.ENGLISH)) {
            case "kinesis":
                sourceFactory = FlinkKinesisConsumer::new;
                copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);
                copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
                copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
                copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
                copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
                break;
            case "kafka":
                sourceFactory = FlinkKafkaConsumer010::new;
                copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");
                copyOptionalArg(configParams, "group-id", sourceProperties, "group.id");
                break;
            case "mock":
                sourceFactory = MockSourceFunction::new;
                break;
            default:
                throw new RuntimeException("Unknown source '" + sourceName + '\'');
        }

        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // poll watermark every second because using BoundedOutOfOrdernessTimestampExtractor
        env.getConfig().setAutoWatermarkInterval(1000L);
        env.enableCheckpointing(5000);

        SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(
                configParams.getRequired("topic"),
                new ObjectNodeDeserializationSchema(),
                sourceProperties
        ))
                .returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back
                .name("raw-events")
                .assignTimestampsAndWatermarks(
                        new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))
                )
                .split(new JsonNodeOutputSelector("eventType"));
...
        eventsByType.select(...)
                .keyBy(new JsonNodeStringKeySelector("_key"))
                .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
                        (KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster))
                .trigger(EventTimeTrigger.create())
                .aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data
                .print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!