Re: ProcessFunction example from the documentation giving me error

Posted by anna stax on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/ProcessFunction-example-from-the-documentation-giving-me-error-tp21651p21666.html

My object name is  CreateUserNotificationRequests, thats why  you see CreateUserNotificationRequests in the Error message.
I edited the object name after pasting the code...Hope there is no confusion and I get some help.
Thanks



On Fri, Jul 20, 2018 at 10:10 AM, anna stax <[hidden email]> wrote:
Hello all,


object ProcessFunctionTest {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val text = env.socketTextStream("localhost", 9999)

    val text1 = text.map(s => (s,s)).keyBy(0).process(new CountWithTimeoutFunction())

    text1.print()

    env.execute("CountWithTimeoutFunction")
  }

  case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

  class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {

    lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
      .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

    override def processElement(value: (String, String), ctx: ProcessFunction[(String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {
    ......
    }

    override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {
    .......
    }
  }
}


Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.whil.flink.streaming.CreateUserNotificationRequests$.main(CreateUserNotificationRequests.scala:42)
at com.whil.flink.streaming.CreateUserNotificationRequests.main(CreateUserNotificationRequests.scala)
Caused by: java.net.ConnectException: Operation timed out (Connection timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)

On Thu, Jul 19, 2018 at 11:22 PM, vino yang <[hidden email]> wrote:
Hi anna,

Can you share your program and the exception stack trace and more details about what's your source and state backend?

From the information you provided, it seems Flink started a network connect but timed out. 

Thanks, vino.

2018-07-20 14:14 GMT+08:00 anna stax <[hidden email]>:
Hi all,

I am new to Flink. I am using the classes CountWithTimestamp and CountWithTimeoutFunction from the examples found in 

I am getting the error Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out)

Looks like when timer’s time is reached I am getting this error. Any idea why. Please help

Thanks