Scala compilation error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala compilation error

Srikanth
Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] = env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), (Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() {}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)

Reply | Threaded
Open this post in threaded view
|

Re: Scala compilation error

Srikanth
Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and Java API. I was refering to javadoc. Is there a scaladoc?
 
Java API has
public <R> SingleOutputStreamOperator<R> transform(
                        String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator) 

Scala API has
def transform[R: TypeInformation](
      functionName: String,
      operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth <[hidden email]> wrote:
Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] = env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), (Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() {}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)


Reply | Threaded
Open this post in threaded view
|

Re: Scala compilation error

Aljoscha Krettek
There is a Scaladoc but it is not covering all packages, unfortunately. In the Scala API you can call transform without specifying a TypeInformation, it works using implicits/context bounds.

On Tue, 3 May 2016 at 01:48 Srikanth <[hidden email]> wrote:
Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and Java API. I was refering to javadoc. Is there a scaladoc?
 
Java API has
public <R> SingleOutputStreamOperator<R> transform(
                        String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator) 

Scala API has
def transform[R: TypeInformation](
      functionName: String,
      operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth <[hidden email]> wrote:
Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] = env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), (Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() {}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)


Reply | Threaded
Open this post in threaded view
|

Re: Scala compilation error

Srikanth
Yes, I did notice the usage of implicit in ConnectedStreams.scala.
Better Scaladoc will be helpful, especially when compiler errors are not clear.

Thanks

On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek <[hidden email]> wrote:
There is a Scaladoc but it is not covering all packages, unfortunately. In the Scala API you can call transform without specifying a TypeInformation, it works using implicits/context bounds.

On Tue, 3 May 2016 at 01:48 Srikanth <[hidden email]> wrote:
Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and Java API. I was refering to javadoc. Is there a scaladoc?
 
Java API has
public <R> SingleOutputStreamOperator<R> transform(
                        String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator) 

Scala API has
def transform[R: TypeInformation](
      functionName: String,
      operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth <[hidden email]> wrote:
Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] = env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), (Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() {}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)