NoSuchMethodError flatMap

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

NoSuchMethodError flatMap

vishnuviswanath

Hi All,

After successfully writing the wordcount program, I was trying to create a streaming application but is getting below error when submitting the job in local mode.

        Vishnus-MacBook-Pro:flink vishnu$ flink run target/scala-2.11/flink-vishnu_2.11-1.0.jar

java.lang.NoSuchMethodError: org.apache.flink.streaming.api.scala.DataStream.flatMap(Lscala/Function1;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/streaming/api/scala/DataStream;

    at com.vishnu.flink.streaming.FlinkStreamingWordCount$.main(FlinkStreamingWordCount.scala:14)

    at com.vishnu.flink.streaming.FlinkStreamingWordCount.main(FlinkStreamingWordCount.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:497)

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)

    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)

    at org.apache.flink.client.program.Client.runBlocking(Client.java:252)

    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)

    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)

    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)

    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)



The exception above occurred while trying to run your command.

This is my FlinkStreamingWordCount.scala file

package com.vishnu.flink.streaming
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._


object FlinkStreamingWordCount {

    def main(args: Array[String])  {

    val sev = StreamExecutionEnvironment.getExecutionEnvironment

    val socTxtStream = sev.socketTextStream("localhost",4444)

    val counts = socTxtStream.flatMap(line => line.split(" ") )

      .map { (_, 1) }

      .keyBy(0)

      .sum(1)

    counts.print()

    sev.execute()

  }

}

This is how my sbt file looks like

val flink = "org.apache.flink" % "flink-core" % "1.0.0"
val flinkclients = "org.apache.flink" % "flink-clients_2.11" % "1.0.0"
val flinkstreaming = "org.apache.flink" % "flink-streaming-scala_2.11" % "1.0.0"

val main = "com.vishnu.flink.streaming.FlinkStreamingWordCount"

name := "flink-vishnu"
mainClass in (Compile, run) := Some(main)
mainClass in (Compile, packageBin) := Some(main)

lazy val commonSettings = Seq(
  organization := "com.vishnu",
  version := "1.0",
  scalaVersion := "2.11.7"
)

lazy val root = (project in file(".")).
  settings(commonSettings:_*).
  settings(
    name := "flink-vishnu",
    libraryDependencies += flink,
    libraryDependencies += flinkclients,
    libraryDependencies += flinkstreaming,
    retrieveManaged := true
  )

I m using scala 2.11.7, and have downloaded Flink for scala 2.11

Any help is appreciated

Thanks,
Vishnu

Reply | Threaded
Open this post in threaded view
|

Re: NoSuchMethodError flatMap

vishnuviswanath
Fixed this,

error was due to miss match between my flink version installed and the one I mentioned in my sbt file.

On Mon, Mar 7, 2016 at 2:33 PM, Vishnu Viswanath <[hidden email]> wrote:

Hi All,

After successfully writing the wordcount program, I was trying to create a streaming application but is getting below error when submitting the job in local mode.

        Vishnus-MacBook-Pro:flink vishnu$ flink run target/scala-2.11/flink-vishnu_2.11-1.0.jar

java.lang.NoSuchMethodError: org.apache.flink.streaming.api.scala.DataStream.flatMap(Lscala/Function1;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/streaming/api/scala/DataStream;

    at com.vishnu.flink.streaming.FlinkStreamingWordCount$.main(FlinkStreamingWordCount.scala:14)

    at com.vishnu.flink.streaming.FlinkStreamingWordCount.main(FlinkStreamingWordCount.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:497)

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)

    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)

    at org.apache.flink.client.program.Client.runBlocking(Client.java:252)

    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)

    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)

    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)

    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)



The exception above occurred while trying to run your command.

This is my FlinkStreamingWordCount.scala file

package com.vishnu.flink.streaming
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._


object FlinkStreamingWordCount {

    def main(args: Array[String])  {

    val sev = StreamExecutionEnvironment.getExecutionEnvironment

    val socTxtStream = sev.socketTextStream("localhost",4444)

    val counts = socTxtStream.flatMap(line => line.split(" ") )

      .map { (_, 1) }

      .keyBy(0)

      .sum(1)

    counts.print()

    sev.execute()

  }

}

This is how my sbt file looks like

val flink = "org.apache.flink" % "flink-core" % "1.0.0"
val flinkclients = "org.apache.flink" % "flink-clients_2.11" % "1.0.0"
val flinkstreaming = "org.apache.flink" % "flink-streaming-scala_2.11" % "1.0.0"

val main = "com.vishnu.flink.streaming.FlinkStreamingWordCount"

name := "flink-vishnu"
mainClass in (Compile, run) := Some(main)
mainClass in (Compile, packageBin) := Some(main)

lazy val commonSettings = Seq(
  organization := "com.vishnu",
  version := "1.0",
  scalaVersion := "2.11.7"
)

lazy val root = (project in file(".")).
  settings(commonSettings:_*).
  settings(
    name := "flink-vishnu",
    libraryDependencies += flink,
    libraryDependencies += flinkclients,
    libraryDependencies += flinkstreaming,
    retrieveManaged := true
  )

I m using scala 2.11.7, and have downloaded Flink for scala 2.11

Any help is appreciated

Thanks,
Vishnu