Re: EXT :Re: How to debug difference between Kinesis and Kafka

Posted by Dian Fu on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-debug-difference-between-Kinesis-and-Kafka-tp26225p26245.html

DataStream.assignTimestampsAndWatermarks will add a watermark generator operator after each source operator(if their parallelism is the same which is true for the code you showed) and so if one instance of the source operator has no data, the corresponding watermark generator operator cannot generate watermark.

Regards,
Dian


在 2019年2月20日,上午12:56,Stephen Connolly <[hidden email]> 写道:

Though I am explicitly assigning watermarks with DataStream.assignTimestampsAndWatermarks and I see all the data flowing through that... so shouldn't that override the watermarks from the original source?

On Tue, 19 Feb 2019 at 15:59, Martin, Nick <[hidden email]> wrote:

Yeah, that’s expected/known. Watermarks for the empty partition don’t advance, so the window in your window function never closes.

 

There’s a ticket open to fix it (https://issues.apache.org/jira/browse/FLINK-5479) for the kafka connector, but in general any time one parallel instance of a source function isn’t getting data you have to watch out for this.

 

From: Stephen Connolly [mailto:[hidden email]]
Sent: Tuesday, February 19, 2019 6:32 AM
To: user <[hidden email]>
Subject: EXT :Re: How to debug difference between Kinesis and Kafka

 

Hmmm my suspicions are now quite high. I created a file source that just replays the events straight then I get more results....

 

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <[hidden email]> wrote:

Hmmm after expanding the dataset such that there was additional data that ended up on shard-0 (everything in my original dataset was coincidentally landing on shard-1) I am now getting output... should I expect this kind of behaviour if no data arrives at shard-0 ever?

 

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <[hidden email]> wrote:

Hi, I’m having a strange situation and I would like to know where I should start trying to debug.

 

I have set up a configurable swap in source, with three implementations:

 

1. A mock implementation

2. A Kafka consumer implementation

3. A Kinesis consumer implementation

 

From injecting a log and no-op map function I can see that all three sources pass through the events correctly.

 

I then have a window based on event time stamps… and from inspecting the aggregation function I can see that the data is getting aggregated…, I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I can retrieve the key

 

Here’s the strange thing, I only change the source (and each source uses the same deserialization function) but:

 

  • When I use either Kafka or my Mock source, the WindowFunction gets called as events pass the end of the window
  • When I use the Kinesis source, however, the window function never gets called. I have even tried injecting events into kinesis with really high timestamps to flush the watermarks in my BoundedOutOfOrdernessTimestampExtractor... but nothing

I cannot see how this source switching could result in such a different behaviour:

 

        Properties sourceProperties = new Properties();

        ConsumerFactory sourceFactory;

        String sourceName = configParams.getRequired("source");

        switch (sourceName.toLowerCase(Locale.ENGLISH)) {

            case "kinesis":

                sourceFactory = FlinkKinesisConsumer::new;

                copyOptionalArg(configParams, "aws-region", sourceProperties, AWSConfigConstants.AWS_REGION);

                copyOptionalArg(configParams, "aws-endpoint", sourceProperties, AWSConfigConstants.AWS_ENDPOINT);

                copyOptionalArg(configParams, "aws-access-key", sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);

                copyOptionalArg(configParams, "aws-secret-key", sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);

                copyOptionalArg(configParams, "aws-profile", sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);

                break;

            case "kafka":

                sourceFactory = FlinkKafkaConsumer010::new;

                copyRequiredArg(configParams, "bootstrap-server", sourceProperties, "bootstrap.servers");

                copyOptionalArg(configParams, "group-id", sourceProperties, "group.id");

                break;

            case "mock":

                sourceFactory = MockSourceFunction::new;

                break;

            default:

                throw new RuntimeException("Unknown source '" + sourceName + '\'');

        }

 

        // set up the streaming execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        // poll watermark every second because using BoundedOutOfOrdernessTimestampExtractor

        env.getConfig().setAutoWatermarkInterval(1000L);

        env.enableCheckpointing(5000);

 

        SplitStream<JsonNode> eventsByType = env.addSource(sourceFactory.create(

                configParams.getRequired("topic"),

                new ObjectNodeDeserializationSchema(),

                sourceProperties

        ))

                .returns(ObjectNode.class) // the use of ConsumerFactory erases the type info so add it back

                .name("raw-events")

                .assignTimestampsAndWatermarks(

                        new ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", Time.seconds(5))

                )

                .split(new JsonNodeOutputSelector("eventType"));

...

        eventsByType.select(...)

                .keyBy(new JsonNodeStringKeySelector("_key"))

                .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),

                        (KeySelector<JsonNode, Time>) TasksMain::offsetPerMaster))

                .trigger(EventTimeTrigger.create())

                .aggregate(new CountsAggregator<>(), new KeyTagger<>()) // <==== The CountsAggregator is seeing the data

                .print() // <==== HERE is where we get no output from Kinesis... but Kafka and my Mock are just fine!

 

 

 


Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you. 
*********************



Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you. 
*********************