Hi,
I am trying to write input from Kafka to a SQL server on AWS, but I have difficulties. I get the following error could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] [error] val dsRow = tableEnv.toAppendStream[Row](result) [error] ^ Any help is appreciated I am not sure whether my approach is correct or not but my code is as follows: import java.util.Propertiesval properties = new Properties() Best regards |
Hi Martin,
usually, this error occurs when people forget to add `org.apache.flink.api.scala._` to their imports. It is triggered by the Scala macro that the DataStream API uses for extracting types. Can you try to call `result.toAppendStream[Row]` directly? This should work if you import `org.apache.flink.table.api.scala._`. Maybe this example helps: https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala Regards, Timo On 22.05.20 08:02, Martin Frank Hansen wrote: > Hi, > > I am trying to write input from Kafka to a SQL server on AWS, but I have > difficulties. > > I get the following error could not find implicit value for evidence > parameter of type > org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] > [error] val dsRow = tableEnv.toAppendStream[Row](result) > [error] ^ > > Any help is appreciated > > I am not sure whether my approach is correct or not but my code is > as follows: > > import java.util.Properties > > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.api.scala._ > import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema} > import org.apache.flink.table.api.scala.StreamTableEnvironment > import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types} > import org.apache.flink.types.Row > > val properties =new Properties() > properties.setProperty("bootstrap.servers",b_servers) > properties.setProperty("zookeeper.connect",zk) > properties.setProperty("group.id <http://group.id>", "very_small_test") > properties.setProperty("ssl.endpoint.identification.algorithm ", "") > properties.setProperty("security.protocol", "SSL") > > > val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0) > > val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env, settings) > > val schema =new Schema() > .field("fullVisitorId",Types.STRING) > .field("eventTime",Types.STRING) > .field("eventID",Types.STRING) > .field("eventType",Types.STRING) > .field("page",Types.MAP( Types.STRING, Types.STRING)) > .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING)) > > > tableEnv.connect(new Kafka() > .version("universal") > .topic("very_small_test") > .properties(properties) > .startFromEarliest() > ) > .withFormat( > new Json() > .failOnMissingField(false) > .deriveSchema() > ) > .withSchema(schema) > .inAppendMode() > .registerTableSource("sql_source") > > > val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'" > > val result =tableEnv.sqlQuery(sqlStatement) > > val dsRow =tableEnv.toAppendStream[Row](result) > > > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") > .setDBUrl("AWS url") > .setUsername(username) > .setPassword(password) > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES > (?, ?, ?)") > .setBatchInterval(100) > .finish() > > dsRow.writeUsingOutputFormat(jdbcOutput) > > tableEnv.execute("SQL test") > > > -- > > *Best regards > > Martin Hansen* > |
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase. If someone is interested in I could do it [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote: Hi Martin, |
Hi Flavio, Thank you so much! Thought i had that import but misread it. The code does not give any errors, but no data is written to the sql server. Can you see why that is? Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
Best regards Martin Hansen |
I expect you to see some exception somewhere, that sql server dialect is not supported yet. On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
|
Hi Flavio, Thanks for your reply. I will try another way then. Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
Best regards Martin Hansen |
Hi again, I am a bit confused as to why the generic jdbc connector would not work with sql-server? Can you explain a bit more? Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <[hidden email]>:
Best regards Martin Hansen |
No sorry, you're right. The JDBCOutputFormat should work..I get confused with the Table API On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <[hidden email]> wrote:
Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
Arh ok thanks, no problem. My problem is now that nothing is sent, do I need to format it in another way? Or did I miss something else? I tried to include Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver") but that didn't work. Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <[hidden email]>:
Best Regards Martin Hansen |
Is the sql-server jdbc jar in the flink dist lib folder? On Fri, May 22, 2020 at 1:30 PM Martin Frank Hansen <[hidden email]> wrote:
Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
In reply to this post by Martin Frank Hansen
Well got it working. The varchars in the database were set too small. Thanks for your help! Den fre. 22. maj 2020 kl. 13.30 skrev Martin Frank Hansen <[hidden email]>:
Martin Frank Hansen Data Engineer Pilestræde 34 | DK-1147 København K | T: +45 33 75 75 75 | berlingskemedia.dk |
Free forum by Nabble | Edit this page |