Hello all,This is my code, just trying to make the code example in https://ci.apache.org/projects/flink/flink-docs-release- work1.5/dev/stream/operators/ process_function.html object ProcessFunctionTest {def main(args: Array[String]) {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic( TimeCharacteristic. ProcessingTime) val text = env.socketTextStream("localhost", 9999) val text1 = text.map(s => (s,s)).keyBy(0).process(new CountWithTimeoutFunction())text1.print()env.execute("CountWithTimeoutFunction") }case class CountWithTimestamp(key: String, count: Long, lastModified: Long)class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])) override def processElement(value: (String, String), ctx: ProcessFunction[(String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {......}override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {.......}}}Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out) at org.apache.flink.runtime.minicluster.MiniCluster. executeJobBlocking( MiniCluster.java:625) at org.apache.flink.streaming.api.environment. LocalStreamEnvironment. execute( LocalStreamEnvironment.java: 121) at org.apache.flink.streaming.api.scala. StreamExecutionEnvironment. execute( StreamExecutionEnvironment. scala:654) at com.whil.flink.streaming.CreateUserNotificationRequests $.main( CreateUserNotificationRequests .scala:42) at com.whil.flink.streaming.CreateUserNotificationRequests .main( CreateUserNotificationRequests .scala) Caused by: java.net.ConnectException: Operation timed out (Connection timed out)at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java: 392) at java.net.Socket.connect(Socket.java:589) at org.apache.flink.streaming.api.functions.source. SocketTextStreamFunction.run( SocketTextStreamFunction.java: 96) at org.apache.flink.streaming.api.operators.StreamSource. run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource. run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks. SourceStreamTask.run( SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask. invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task. java:703) at java.lang.Thread.run(Thread.java:745) On Thu, Jul 19, 2018 at 11:22 PM, vino yang <[hidden email]> wrote:Hi anna,Can you share your program and the exception stack trace and more details about what's your source and state backend?From the information you provided, it seems Flink started a network connect but timed out.Thanks, vino.2018-07-20 14:14 GMT+08:00 anna stax <[hidden email]>:Hi all,I am new to Flink. I am using the classes CountWithTimestamp and CountWithTimeoutFunction from the examples found inI am getting the error Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out) Looks like when timer’s time is reached I am getting this error. Any idea why. Please helpThanks
Free forum by Nabble | Edit this page |