http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/ProcessFunction-example-from-the-documentation-giving-me-error-tp21651p21665.html
Hello all,
object ProcessFunctionTest {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val text = env.socketTextStream("localhost", 9999)
val text1 = text.map(s => (s,s)).keyBy(0).process(new CountWithTimeoutFunction())
text1.print()
env.execute("CountWithTimeoutFunction")
}
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(value: (String, String), ctx: ProcessFunction[(String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {
......
}
override def onTimer(timestamp: Long, ctx: ProcessFunction[(String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {
.......
}
}
}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.net.ConnectException: Operation timed out (Connection timed out)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.whil.flink.streaming.CreateUserNotificationRequests$.main(CreateUserNotificationRequests.scala:42)
at com.whil.flink.streaming.CreateUserNotificationRequests.main(CreateUserNotificationRequests.scala)
Caused by: java.net.ConnectException: Operation timed out (Connection timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)