Hello,
I'm fac val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), properties)) val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => BidderRawLogs(b)).keyBy(b => b.strategyId) val metaStrategy: KeyedStream[(Int, String), Int] = env.readTextFile("path").name("Strategy") .map((1, _) ).keyBy(_._1) val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo() val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo() val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), (Int, BidderRawLogs, (Int, String))] = new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, staticTypeInfo) val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() {}.getTypeInfo() val funName = "test" val joinedStream = bidderStream.connect(metaStrategy) .transform(funName, joinOperator, outTypeInfo) |
Sorry for the previous incomplete email. Didn't realize I hit send!
I was facing a weird compilation error in Scala when I did val joinedStream = stream1.connect(stream2) .transform("funName", outTypeInfo, joinOperator) It turned out to be due to a difference in API signature between Scala and Java API. I was refering to javadoc. Is there a scaladoc? Java API has public <R> SingleOutputStreamOperator<R> transform( String functionName, TypeInformation<R> outTypeInfo, TwoInputStreamOperator<IN1, IN2, R> operator) Scala API has def transform[R: TypeInformation]( functionName: String, operator: TwoInputStreamOperator[IN1, IN2, R]) Srikanth On Mon, May 2, 2016 at 7:18 PM, Srikanth <[hidden email]> wrote:
|
There is a Scaladoc but it is not covering all packages, unfortunately. In the Scala API you can call transform without specifying a TypeInformation, it works using implicits/context bounds. On Tue, 3 May 2016 at 01:48 Srikanth <[hidden email]> wrote:
|
Yes, I did notice the usage of implicit in ConnectedStreams.scala.
Better Scaladoc will be helpful, especially when compiler errors are not clear. Thanks On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |