I test streaming to read data through JDBCInputFormat with code snippet as below (scala 2.11, flink 1.0)
val input = JDBCInputFormat.buildJDBCInputFormat. setDrivername(driver). setDBUrl(url). setQuery(sql). setUsername(user). setPassword(password). finish. asInstanceOf[JDBCInputFormat[Tuple1[String]]] val env = StreamExecutionEnvironment.getExecutionEnvironment val dataset = env.createInput(input) dataset.writeAsText("/tmp/jdbc.txt") env.execute("jdbc-test") However submitting the job it throws Type of TypeVariable 'OT' in 'class org.apache.flink.api.common.io.RichInputFormat' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:670) org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:564) org.apache.flink.api.java.typeutils.TypeExtractor.getInputFormatTypes(TypeExtractor.java:272) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.java:1011) org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.scala:484) Where should I add that OT stuff so to avoid the type erasure problem? Thanks |
Hi Robertson,
You need to supply a TypeInformation for the data read from the InputFormat. val dataset = env.createInput(input, new TupleTypeInfo(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO)) should do the trick. Cheers, Max On Tue, May 31, 2016 at 1:13 PM, Robertson Williams <[hidden email]> wrote: > I test streaming to read data through JDBCInputFormat with code snippet as > below (scala 2.11, flink 1.0) > > val input = JDBCInputFormat.buildJDBCInputFormat. > setDrivername(driver). > setDBUrl(url). > setQuery(sql). > setUsername(user). > setPassword(password). > finish. > asInstanceOf[JDBCInputFormat[Tuple1[String]]] > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val dataset = env.createInput(input) > dataset.writeAsText("/tmp/jdbc.txt") > env.execute("jdbc-test") > > However submitting the job it throws > > Type of TypeVariable 'OT' in 'class > org.apache.flink.api.common.io.RichInputFormat' could not be determined. > This is most likely a type erasure problem. The type extraction currently > supports types with generic variables only in cases where all variables in > the return type can be deduced from the input type(s). > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:670) > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:564) > org.apache.flink.api.java.typeutils.TypeExtractor.getInputFormatTypes(TypeExtractor.java:272) > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.java:1011) > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.createInput(StreamExecutionEnvironment.scala:484) > > Where should I add that OT stuff so to avoid the type erasure problem? > > Thanks > > |
Free forum by Nabble | Edit this page |