How to debug why Flink makes and executes only partial plan

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to debug why Flink makes and executes only partial plan

scgupta
Hi,

In my Flink program, after a couple of map, union and connect, I have a final filter and a sink. Something like this (after abstracting out details):

val filteredEvents: DataStream[NotificationEvent]
  = allThisStuffWorking
      .name("filtered_users")

filteredEvents
  .filter(x => check(x.f1, x.f2, someStuff)) //BUG
  .addSink(new NotificationSinkFunction(notifier))
  .name("send_notification")

The check function returns a Boolean and does not access anything other than parameters passed. Here is relevant part of Notification Sink Function:

class NotificationSinkFunction(notifier: Notifier)
      extends SinkFunction[NotificationEvent] {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  def invoke(event: NotificationEvent): Unit = {
    LOG.info("Log this notification detail")
    
notifier.send(event.f1, event.f2) //BUG
  }
}

If I comment out the lines highlighted and marked with //BUG, the Flink pipeline works and print the log messages, and Flink shows this execution plan at the end: 

filtered_users -> Sink: send_notification

Inline image 1


But with either of those two lines marked as BUG above, Flink makes and executes plan only till filtered_user and does not print the log message.

Inline image 2

How can I figure out what is wrong with the check function or notifier send function that prevents Flink from making the full plan. What are the typical mistakes leading to this?

Thanks,
+satish
Reply | Threaded
Open this post in threaded view
|

Re: How to debug why Flink makes and executes only partial plan

Aljoscha Krettek
Hi,
are we talking about the Plan View in the JobManager dashboard? If yes, then I expect there to be only one "box" for the combination of filter and sink because they are chained together to avoid sending data.

For debugging, could you maybe change check() to always return true and see if you then yet your messages from the sink?

Cheers,
Aljoscha

On Sat, 15 Oct 2016 at 05:26 Satish Chandra Gupta <[hidden email]> wrote:
Hi,

In my Flink program, after a couple of map, union and connect, I have a final filter and a sink. Something like this (after abstracting out details):

val filteredEvents: DataStream[NotificationEvent]
  = allThisStuffWorking
      .name("filtered_users")

filteredEvents
  .filter(x => check(x.f1, x.f2, someStuff)) //BUG
  .addSink(new NotificationSinkFunction(notifier))
  .name("send_notification")

The check function returns a Boolean and does not access anything other than parameters passed. Here is relevant part of Notification Sink Function:

class NotificationSinkFunction(notifier: Notifier)
      extends SinkFunction[NotificationEvent] {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  def invoke(event: NotificationEvent): Unit = {
    LOG.info("Log this notification detail")
    
notifier.send(event.f1, event.f2) //BUG
  }
}

If I comment out the lines highlighted and marked with //BUG, the Flink pipeline works and print the log messages, and Flink shows this execution plan at the end: 

filtered_users -> Sink: send_notification

Inline image 1


But with either of those two lines marked as BUG above, Flink makes and executes plan only till filtered_user and does not print the log message.

Inline image 2

How can I figure out what is wrong with the check function or notifier send function that prevents Flink from making the full plan. What are the typical mistakes leading to this?

Thanks,
+satish