Hi all, I am new to Flink. I am using the classes CountWithTimestamp and CountWithTimeoutFunction from the examples found in I am getting the error Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out) Looks like when timer’s time is reached I am getting this error. Any idea why. Please help Thanks |
Hi anna, Can you share your program and the exception stack trace and more details about what's your source and state backend? From the information you provided, it seems Flink started a network connect but timed out. Thanks, vino. 2018-07-20 14:14 GMT+08:00 anna stax <[hidden email]>:
|
Hello all, This is my code, just trying to make the code example in https://ci.apache.org/ object ProcessFunctionTest { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val text = env.socketTextStream("localhost", 9999) val text1 = text.map(s => (s,s)).keyBy(0).process(new CountWithTimeoutFunction()) text1.print() env.execute("CountWithTimeoutFunction") } case class CountWithTimestamp(key: String, count: Long, lastModified: Long) class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] { lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) override def processElement(value: (String, String), ctx: ProcessFunction[(String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = { ...... } override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = { ....... } } } Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.whil.flink.streaming.CreateUserNotificationRequests$.main(CreateUserNotificationRequests.scala:42) at com.whil.flink.streaming.CreateUserNotificationRequests.main(CreateUserNotificationRequests.scala) Caused by: java.net.ConnectException: Operation timed out (Connection timed out) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:745) On Thu, Jul 19, 2018 at 11:22 PM, vino yang <[hidden email]> wrote:
|
My object name is CreateUserNotificationRequests, thats why you see CreateUserNotificationRequests in the Error message. I edited the object name after pasting the code...Hope there is no confusion and I get some help. Thanks On Fri, Jul 20, 2018 at 10:10 AM, anna stax <[hidden email]> wrote:
|
It is not the code, but I don't know what the problem is. A simple word count with socketTextStream used to work but now gives the same error. Apps with kafka source which used to work is giving the same error. When I have a source generator within the app itself works good. So, with socketTextStream and kafka source gives me java.net.ConnectException: Operation timed out (Connection timed out) error On Fri, Jul 20, 2018 at 10:29 AM, anna stax <[hidden email]> wrote:
|
Hi anna, From the stack trace you provided, it's socket connect error not about Flink. So, Have you start a socket server at "localhost:9999"? Using a program or CLI tool, such as "nc -l 9999" There is a example you can have a look[1]. Thanks, vino. 2018-07-21 4:30 GMT+08:00 anna stax <[hidden email]>:
|
It is good now. Sorry, my fault. I had multiple applications running and both were using the socket stream . Thanks. On Sun, Jul 22, 2018 at 8:22 PM, vino yang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |