Writing to SQL server

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing to SQL server

Martin Frank Hansen
Hi, 

I am trying to write input from Kafka to a SQL server on AWS, but I have difficulties. 

I get the following error could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]                                           ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is as follows:
import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
import org.apache.flink.types.Row
val properties = new Properties()
properties.setProperty("bootstrap.servers",b_servers)
properties.setProperty("zookeeper.connect",zk)
properties.setProperty("group.id", "very_small_test")
properties.setProperty("ssl.endpoint.identification.algorithm ", "")
properties.setProperty("security.protocol", "SSL")


val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema = new Schema()
.field("fullVisitorId",Types.STRING)
.field("eventTime",Types.STRING)
.field("eventID",Types.STRING)
.field("eventType",Types.STRING)
.field("page",Types.MAP( Types.STRING, Types.STRING))
.field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


tableEnv.connect(new Kafka()
.version("universal")
.topic("very_small_test")
.properties(properties)
.startFromEarliest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(schema)
.inAppendMode()
.registerTableSource("sql_source")


val sqlStatement = "SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"

val result = tableEnv.sqlQuery(sqlStatement)

val dsRow = tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.setDBUrl("AWS url")
.setUsername(username)
.setPassword(password)
.setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES (?, ?, ?)")
.setBatchInterval(100)
.finish()

dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")

--

Best regards

Martin Hansen

Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Timo Walther
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:

> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*
>

Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Flavio Pompermaier
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*

Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Martin Frank Hansen
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Flavio Pompermaier
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Martin Frank Hansen
Hi Flavio, 

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen




--
Best regards

Martin Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Martin Frank Hansen
Hi again, 

I am a bit confused as to why the generic jdbc connector would not work with sql-server? 

Can you explain a bit more? 


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <[hidden email]>:
Hi Flavio, 

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen




--
Best regards

Martin Hansen




--
Best regards  

Martin Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Flavio Pompermaier
No sorry, you're right. The JDBCOutputFormat should work..I get confused with the Table API

On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <[hidden email]> wrote:
Hi again, 

I am a bit confused as to why the generic jdbc connector would not work with sql-server? 

Can you explain a bit more? 


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <[hidden email]>:
Hi Flavio, 

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen




--
Best regards

Martin Hansen




--
Best regards  

Martin Hansen




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809
Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Martin Frank Hansen
Arh ok thanks, no problem. 

My problem is now that nothing is sent, do I need to format it in another way? Or did I miss something else? 

I tried to include Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.

Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <[hidden email]>:
No sorry, you're right. The JDBCOutputFormat should work..I get confused with the Table API

On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <[hidden email]> wrote:
Hi again, 

I am a bit confused as to why the generic jdbc connector would not work with sql-server? 

Can you explain a bit more? 


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <[hidden email]>:
Hi Flavio, 

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen




--
Best regards

Martin Hansen




--
Best regards  

Martin Hansen




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


-- 
Best Regards

Martin Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Flavio Pompermaier
Is the sql-server jdbc jar in the flink dist lib folder?

On Fri, May 22, 2020 at 1:30 PM Martin Frank Hansen <[hidden email]> wrote:
Arh ok thanks, no problem. 

My problem is now that nothing is sent, do I need to format it in another way? Or did I miss something else? 

I tried to include Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.

Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <[hidden email]>:
No sorry, you're right. The JDBCOutputFormat should work..I get confused with the Table API

On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <[hidden email]> wrote:
Hi again, 

I am a bit confused as to why the generic jdbc connector would not work with sql-server? 

Can you explain a bit more? 


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <[hidden email]>:
Hi Flavio, 

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen




--
Best regards

Martin Hansen




--
Best regards  

Martin Hansen




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


-- 
Best Regards

Martin Hansen




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809
Reply | Threaded
Open this post in threaded view
|

Re: Writing to SQL server

Martin Frank Hansen
In reply to this post by Martin Frank Hansen
Well got it working. 

The varchars in the database were set too small. 

Thanks for your help!

Den fre. 22. maj 2020 kl. 13.30 skrev Martin Frank Hansen <[hidden email]>:
Arh ok thanks, no problem. 

My problem is now that nothing is sent, do I need to format it in another way? Or did I miss something else? 

I tried to include Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")  but that didn't work.

Den fre. 22. maj 2020 kl. 11.57 skrev Flavio Pompermaier <[hidden email]>:
No sorry, you're right. The JDBCOutputFormat should work..I get confused with the Table API

On Fri, May 22, 2020 at 11:51 AM Martin Frank Hansen <[hidden email]> wrote:
Hi again, 

I am a bit confused as to why the generic jdbc connector would not work with sql-server? 

Can you explain a bit more? 


Den fre. 22. maj 2020 kl. 11.33 skrev Martin Frank Hansen <[hidden email]>:
Hi Flavio, 

Thanks for your reply. I will try another way then.


Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier <[hidden email]>:
I expect you to see some exception somewhere, that sql server dialect is not supported yet.

On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Flavio, 

Thank you so much! Thought i had that import but misread it. 

The code does not give any errors, but no data is written to the sql server. Can you see why that is? 



Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier <[hidden email]>:
SQL server should not be supported from what I know..for this I opened a PR[1] that I should rebase.
If someone is interested in I could do it 


On Fri, May 22, 2020 at 8:35 AM Timo Walther <[hidden email]> wrote:
Hi Martin,

usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to write input from Kafka to a SQL server on AWS, but I have
> difficulties.
>
> I get the following error could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
> [error]   val dsRow = tableEnv.toAppendStream[Row](result)
> [error]                                           ^
>
> Any help is appreciated
>
> I am not sure whether my approach is correct or not but my code is
> as follows:
>
> import java.util.Properties
>
> import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, Types}
> import org.apache.flink.types.Row
>
>    val properties =new Properties()
>    properties.setProperty("bootstrap.servers",b_servers)
>    properties.setProperty("zookeeper.connect",zk)
>    properties.setProperty("group.id <http://group.id>", "very_small_test")
>    properties.setProperty("ssl.endpoint.identification.algorithm ", "")
>    properties.setProperty("security.protocol", "SSL")
>
>
>    val kafkaSource: FlinkKafkaConsumerBase[String] =new FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), properties).setStartFromTimestamp(0)
>
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env, settings)
>
> val schema =new Schema()
>      .field("fullVisitorId",Types.STRING)
>      .field("eventTime",Types.STRING)
>      .field("eventID",Types.STRING)
>      .field("eventType",Types.STRING)
>      .field("page",Types.MAP( Types.STRING, Types.STRING))
>      .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
>
>
>    tableEnv.connect(new Kafka()
>        .version("universal")
>        .topic("very_small_test")
>        .properties(properties)
>        .startFromEarliest()
>       )
>      .withFormat(
>      new Json()
>        .failOnMissingField(false)
>        .deriveSchema()
>    )
>      .withSchema(schema)
>      .inAppendMode()
>      .registerTableSource("sql_source")
>
>
> val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like '%BT%'"
>
> val result =tableEnv.sqlQuery(sqlStatement)
>
>    val dsRow =tableEnv.toAppendStream[Row](result)
>
>
> val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
>      .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
>      .setDBUrl("AWS url")
> .setUsername(username)
>      .setPassword(password)
>      .setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
> (?, ?, ?)")
>      .setBatchInterval(100)
>      .finish()
>
>    dsRow.writeUsingOutputFormat(jdbcOutput)
>
> tableEnv.execute("SQL test")
>
>
> --
>
> *Best regards
>
> Martin Hansen*



--

Best regards

Martin Hansen




--
Best regards

Martin Hansen




--
Best regards  

Martin Hansen




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


-- 
Best Regards

Martin Hansen




--

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: [hidden email]

Pilestræde 34 | DK-1147 København K | T: +45 33 75 75 75 | berlingskemedia.dk