Never gets into ProcessWindowFunction.process()

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Never gets into ProcessWindowFunction.process()

Vijay Balakrishnan
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Never gets into ProcessWindowFunction.process()

Vijay Balakrishnan
Hi,
Any help is appreciated.Dug into this. I can see the deserialized output log from FlinkKinesisConsumer deserialization but it keeps looping to pull from Kinesis Stream but never gets into the Windowing operation for process() or apply().

FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream and the deserialized output never seems to get into the apply() or process() method of a Windowing operation. I can see the logs of MonitoringMapKinesisSchema deserializing data back successfully from Kinesis and converting into a POJO.

Code:

//Create environment:
StreamExecutionEnvironment env;
if (local) {
    Configuration configuration = new Configuration();
    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
    env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
} else {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//create FlinkKinesisConsumer
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
        kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);//deserialization works fine
DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer);
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }
                });

WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
        .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .process(new Window5SecProcessing());//never gets in here
//Gets into Window5SecProcessing.open() method during initialization but never into the process method ????????
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {

        private transient String interval;
        private transient String gameId;
        private transient String keyType;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;

        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;
        public Window5SecProcessing() {

        }

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            this.gameId = gameId;
            this.interval = interval;
            this.keyType = keyType;
        }

        @Override
        public void clear(Context context) throws Exception {
            super.clear(context);
            KeyedStateStore keyedStateStore = context.windowState();
            keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Gets in here fine -Window5SecProcessing -Entered open - parameters:{}", parameters);
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }


        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("@@never gets here@@Window5SecProcessing - Entered process ");//
...
}




On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Never gets into ProcessWindowFunction.process()

Gary Yao-2
In reply to this post by Vijay Balakrishnan
Hi,

If the job is actually running and consuming from Kinesis, the log you posted
is unrelated to your problem. To understand why the process function is not
invoked, we would need to see more of your code, or you would need to provide
an executable example. The log only shows that all offered slots are occupied
by tasks of your job.

Best,
Gary

On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Never gets into ProcessWindowFunction.process()

Vijay Balakrishnan
Hi Gary,
Just posted the code.Pls let me know if that clarifies the problem. Have been digging into how the FlinkKinesisConsumer deserialized output gets passed into the process() or apply() method to no avail. The coding pattern I used matches all the fink-examples I have seen for Flink 1.6.1
TIA,
Vijay

On Fri, Nov 9, 2018 at 9:53 AM Gary Yao <[hidden email]> wrote:
Hi,

If the job is actually running and consuming from Kinesis, the log you posted
is unrelated to your problem. To understand why the process function is not
invoked, we would need to see more of your code, or you would need to provide
an executable example. The log only shows that all offered slots are occupied
by tasks of your job.

Best,
Gary

On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Never gets into ProcessWindowFunction.process()

Gary Yao-2
In reply to this post by Vijay Balakrishnan
Hi,

You are using event time but are you assigning watermarks [1]? I do not see it
in the code.

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records

On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Any help is appreciated.Dug into this. I can see the deserialized output log from FlinkKinesisConsumer deserialization but it keeps looping to pull from Kinesis Stream but never gets into the Windowing operation for process() or apply().

FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream and the deserialized output never seems to get into the apply() or process() method of a Windowing operation. I can see the logs of MonitoringMapKinesisSchema deserializing data back successfully from Kinesis and converting into a POJO.

Code:

//Create environment:
StreamExecutionEnvironment env;
if (local) {
    Configuration configuration = new Configuration();
    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
    env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
} else {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//create FlinkKinesisConsumer
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
        kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);//deserialization works fine
DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer);
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }
                });

WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
        .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .process(new Window5SecProcessing());//never gets in here
//Gets into Window5SecProcessing.open() method during initialization but never into the process method ????????
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {

        private transient String interval;
        private transient String gameId;
        private transient String keyType;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;

        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;
        public Window5SecProcessing() {

        }

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            this.gameId = gameId;
            this.interval = interval;
            this.keyType = keyType;
        }

        @Override
        public void clear(Context context) throws Exception {
            super.clear(context);
            KeyedStateStore keyedStateStore = context.windowState();
            keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Gets in here fine -Window5SecProcessing -Entered open - parameters:{}", parameters);
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }


        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("@@never gets here@@Window5SecProcessing - Entered process ");//
...
}




On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,

Reply | Threaded
Open this post in threaded view
|

Re: Never gets into ProcessWindowFunction.process()

Vijay Balakrishnan
Hi Gary,
Bang on the money. I did not have an assigned Watermark and once I put that in, the code entered the process() method.
Thx a ton for your help.Life-saver!!!!
DataStream<Monitoring> kinesisStream = env
.addSource(kinesisConsumer)
.assignTimestampsAndWatermarks(new MonitoringAssigner())//<=============


On Fri, Nov 9, 2018 at 10:02 AM Gary Yao <[hidden email]> wrote:
Hi,

You are using event time but are you assigning watermarks [1]? I do not see it
in the code.

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records

On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Any help is appreciated.Dug into this. I can see the deserialized output log from FlinkKinesisConsumer deserialization but it keeps looping to pull from Kinesis Stream but never gets into the Windowing operation for process() or apply().

FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream and the deserialized output never seems to get into the apply() or process() method of a Windowing operation. I can see the logs of MonitoringMapKinesisSchema deserializing data back successfully from Kinesis and converting into a POJO.

Code:

//Create environment:
StreamExecutionEnvironment env;
if (local) {
    Configuration configuration = new Configuration();
    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
    env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
} else {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//create FlinkKinesisConsumer
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
        kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);//deserialization works fine
DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer);
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }
                });

WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
        .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .process(new Window5SecProcessing());//never gets in here
//Gets into Window5SecProcessing.open() method during initialization but never into the process method ????????
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {

        private transient String interval;
        private transient String gameId;
        private transient String keyType;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;

        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;
        public Window5SecProcessing() {

        }

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            this.gameId = gameId;
            this.interval = interval;
            this.keyType = keyType;
        }

        @Override
        public void clear(Context context) throws Exception {
            super.clear(context);
            KeyedStateStore keyedStateStore = context.windowState();
            keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Gets in here fine -Window5SecProcessing -Entered open - parameters:{}", parameters);
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }


        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("@@never gets here@@Window5SecProcessing - Entered process ");//
...
}




On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);

DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer)
                .uid(jobName + "KinesisSource");
KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {
                        return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }});
            
        WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win
                .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))
                .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);
enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {
            @Override
            public void invoke(MonitoringGrouping mg, Context context) throws Exception {
                //TODO call ES
                logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);
            }
        });
Window processing class:
private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L);
            total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......
       
        public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {
            logger.debug("Window5SecProcessing - Entered process ");//never gets here
            Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;
            ....
        }

    }
At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ??
Log:
flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool Status:
status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
available slots: []
allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,