SourceFunction Scala

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

SourceFunction Scala

Ankur Sharma
Hello,

I am trying to use a custom source function (declaration given below) for DataStream.
if I add the source to stream using add source: 

val stream = env.addSource(new QueryOneSource(args))
I get following error:  Any explanations and help ??

Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
    val stream = env.addSource(new QueryOneSource(args))
                              ^
Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
    val stream = env.addSource(new QueryOneSource(args))
                              ^

class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
while (true) {
nextRecord()
ctx.collect(this.nextTuple)
}
}

override def cancel() = { }
}

override def nextRecord() = {
}
}

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: [hidden email] 
            [hidden email]


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: SourceFunction Scala

Márton Balassi
Hey Ankur,

Add the following line to your imports, and have a look at the referenced FAQ. [1]

import org.apache.flink.streaming.api.scala._

[1] https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters

Best,

Marton

On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <[hidden email]> wrote:
Hello,

I am trying to use a custom source function (declaration given below) for DataStream.
if I add the source to stream using add source: 

val stream = env.addSource(new QueryOneSource(args))
I get following error:  Any explanations and help ??

Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
    val stream = env.addSource(new QueryOneSource(args))
                              ^
Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
    val stream = env.addSource(new QueryOneSource(args))
                              ^

class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
while (true) {
nextRecord()
ctx.collect(this.nextTuple)
}
}

override def cancel() = { }
}

override def nextRecord() = {
}
}

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: [hidden email] 
            [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: SourceFunction Scala

Ankur Sharma
Hi, 


I am getting following error while executing the fat jar of project: Any help?


Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema
        at org.mpi.debs.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more


Main.scala: 

import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
val stream = env.addSource(new RMQSource[String]("localhost","query-one", new SimpleStringSchema))
stream.addSink(new SinkFunction[String] {
override def invoke(value: String) = {
println(value)
}
})
env.execute("QueryOneExecutor")
}
}
Best,
Ankur Sharma

On 06 Mar 2016, at 20:34, Márton Balassi <[hidden email]> wrote:

Hey Ankur,

Add the following line to your imports, and have a look at the referenced FAQ. [1]

import org.apache.flink.streaming.api.scala._

[1] https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters

Best,

Marton

On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <[hidden email]> wrote:
Hello,

I am trying to use a custom source function (declaration given below) for DataStream.
if I add the source to stream using add source: 

val stream = env.addSource(new QueryOneSource(args))
I get following error:  Any explanations and help ??

Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
    val stream = env.addSource(new QueryOneSource(args))
                              ^
Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
    val stream = env.addSource(new QueryOneSource(args))
                              ^

class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
while (true) {
nextRecord()
ctx.collect(this.nextTuple)
}
}

override def cancel() = { }
}

override def nextRecord() = {
}
}

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: [hidden email] 
            [hidden email]




smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: SourceFunction Scala

stefanobaghino
Hi Ankur,

I'm catching up with this week mailing list right now; I hope you already solved the issue, but if you haven't this kind of problem happen when you use a version of Scala for which your Flink dependencies have not been compiled for. Make sure you append the correct Scala version to the dependencies you're using, depending on the one you are using for your project.


On Mon, Mar 7, 2016 at 1:19 PM, Ankur Sharma <[hidden email]> wrote:
Hi, 


I am getting following error while executing the fat jar of project: Any help?


Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema
        at org.mpi.debs.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more


Main.scala: 

import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
val stream = env.addSource(new RMQSource[String]("localhost","query-one", new SimpleStringSchema))
stream.addSink(new SinkFunction[String] {
override def invoke(value: String) = {
println(value)
}
})
env.execute("QueryOneExecutor")
}
}
Best,
Ankur Sharma

On 06 Mar 2016, at 20:34, Márton Balassi <[hidden email]> wrote:

Hey Ankur,

Add the following line to your imports, and have a look at the referenced FAQ. [1]

import org.apache.flink.streaming.api.scala._

[1] https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters

Best,

Marton

On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <[hidden email]> wrote:
Hello,

I am trying to use a custom source function (declaration given below) for DataStream.
if I add the source to stream using add source: 

val stream = env.addSource(new QueryOneSource(args))
I get following error:  Any explanations and help ??

Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
    val stream = env.addSource(new QueryOneSource(args))
                              ^
Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
    val stream = env.addSource(new QueryOneSource(args))
                              ^

class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
while (true) {
nextRecord()
ctx.collect(this.nextTuple)
}
}

override def cancel() = { }
}

override def nextRecord() = {
}
}

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: [hidden email] 
            [hidden email]






--
BR,
Stefano Baghino

Software Engineer @ Radicalbit