Hi, Running in IntelliJ IDE on a Mac with 4 vProcessors. Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization. Not sure if I am failing with no slots available ??? In main(): ........ //trimmed a lot of code FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...); DataStream<Monitoring> kinesisStream = env .addSource(kinesisConsumer) .uid(jobName + "KinesisSource"); KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() { public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception { return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); }}); WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION)) .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS); enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() { @Override public void invoke(MonitoringGrouping mg, Context context) throws Exception { //TODO call ES logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg); } }); Window processing class: private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> { private transient Histogram fiveSecHist; private transient Histogram fiveMinHist; private transient org.apache.flink.metrics.Histogram fiveSecHistogram; private transient org.apache.flink.metrics.Histogram fiveMinHistogram; private transient ValueState<Long> total5SecCountState; private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor; public Window5SecProcessing(String gameId, String interval, String keyType) { ... } public void open(Configuration parameters) throws Exception { super.open(parameters); logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here com.codahale.metrics.Histogram fiveSecHist = new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist); total5SecCountValueStateDescriptor = new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L); total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor); } ...... public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception { logger.debug("Window5SecProcessing - Entered process ");//never gets here Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1; .... } } At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ?? Log: flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool Status: status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] available slots: [] allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] pending requests: [] sharing groups: { -------- 5a0ae59368145d715b3cc0d39ba6c05a -------- { groupId=5a0ae59368145d715b3cc0d39ba6c05a unresolved={} resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]} all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}} } } TIA, |
Hi, Any help is appreciated.Dug into this. I can see the deserialized output log from FlinkKinesisConsumer deserialization but it keeps looping to pull from Kinesis Stream but never gets into the Windowing operation for process() or apply().FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream and the deserialized output never seems to get into the apply() or process() method of a Windowing operation. I can see the logs of MonitoringMapKinesisSchema deserializing data back successfully from Kinesis and converting into a POJO. Code: //Create environment: StreamExecutionEnvironment env; if (local) { Configuration configuration = new Configuration(); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration); } else { env = StreamExecutionEnvironment.getExecutionEnvironment(); } env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //create FlinkKinesisConsumer Properties kinesisConsumerConfig = new Properties(); kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000"); kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);//deserialization works fine DataStream<Monitoring> kinesisStream = env .addSource(kinesisConsumer); KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() { public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception { return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); } }); WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION)) .process(new Window5SecProcessing());//never gets in here //Gets into Window5SecProcessing.open() method during initialization but never into the process method ???????? private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> { private transient String interval; private transient String gameId; private transient String keyType; private transient org.apache.flink.metrics.Histogram fiveSecHistogram; private transient ValueState<Long> total5SecCountState; private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor; public Window5SecProcessing() { } public Window5SecProcessing(String gameId, String interval, String keyType) { this.gameId = gameId; this.interval = interval; this.keyType = keyType; } @Override public void clear(Context context) throws Exception { super.clear(context); KeyedStateStore keyedStateStore = context.windowState(); keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); logger.debug("Gets in here fine -Window5SecProcessing -Entered open - parameters:{}", parameters); com.codahale.metrics.Histogram fiveSecHist = new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist); total5SecCountValueStateDescriptor = new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L); total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor); } public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception { logger.debug("@@never gets here@@Window5SecProcessing - Entered process ");// ... } On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <[hidden email]> wrote:
|
In reply to this post by Vijay Balakrishnan
Hi, If the job is actually running and consuming from Kinesis, the log you posted is unrelated to your problem. To understand why the process function is not invoked, we would need to see more of your code, or you would need to provide an executable example. The log only shows that all offered slots are occupied by tasks of your job. Best, Gary On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Gary, Just posted the code.Pls let me know if that clarifies the problem. Have been digging into how the FlinkKinesisConsumer deserialized output gets passed into the process() or apply() method to no avail. The coding pattern I used matches all the fink-examples I have seen for Flink 1.6.1 TIA, Vijay On Fri, Nov 9, 2018 at 9:53 AM Gary Yao <[hidden email]> wrote:
|
In reply to this post by Vijay Balakrishnan
Hi, You are using event time but are you assigning watermarks [1]? I do not see it in the code. Best, Gary [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Hi Gary, Bang on the money. I did not have an assigned Watermark and once I put that in, the code entered the process() method. Thx a ton for your help.Life-saver!!!! DataStream<Monitoring> kinesisStream = env On Fri, Nov 9, 2018 at 10:02 AM Gary Yao <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |