Using Flink 1.10 and coding in Java 11, is it possible use to write to Teradata in append mode? MySQL, PostgreSQL, and Derby are the only supported drivers listed. Thanks.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#connectors I created the ConnectorDescriptor below and am using it from tableEnvironment.connect() but get the exception shown below. public class Teradata extends ConnectorDescriptor { /** * Constructs a {@link ConnectorDescriptor}. */ public Teradata() { super("jdbc", 1, false); } @Override protected Map<String, String> toConnectorProperties() { Map<String, String> map = new HashMap<>(); map.put("Drivername", "com.teradata.jdbc.TeraDriver"); map.put("DBUrl", "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP"); map.put("Username", "..."); map.put("Password", "..."); return map; } } org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all properties. The matching candidates: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory Unsupported property keys: drivername update-mode password dburl username The following properties are requested: connector.property-version=1 connector.type=jdbc dburl=jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP drivername=com.teradata.jdbc.TeraDriver password=xxx schema.0.data-type=VARCHAR(1) schema.0.name=f0 schema.1.data-type=VARCHAR(1) schema.1.name=f1 schema.2.data-type=VARCHAR(1) schema.2.name=f2 update-mode=append username=xxx The following factories have been considered: org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory ******* CONFIDENTIALITY NOTICE ******* This e-mail message and all attachments transmitted with it may contain legally privileged and confidential information intended solely for the use of the addressee. If the reader of this message is not the intended recipient, you are hereby notified that any reading, dissemination, distribution, copying, or other use of this message or its attachments is strictly prohibited. If you have received this message in error, please notify the sender immediately and delete this message from your system. Thank you. |
Same error with this change:
public class Teradata extends ConnectorDescriptor { /** * Constructs a {@link ConnectorDescriptor}. */ public Teradata() { super("jdbc", 1, false); } @Override protected Map<String, String> toConnectorProperties() { Map<String, String> map = new HashMap<>(); map.put(JDBCValidator.CONNECTOR_DRIVER, "com.teradata.jdbc.TeraDriver"); map.put(JDBCValidator.CONNECTOR_URL, "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP"); map.put(JDBCValidator.CONNECTOR_USERNAME, "..."); map.put(JDBCValidator.CONNECTOR_PASSWORD, "...!"); return map; } } -----Original Message----- From: Norm Vilmer (Contractor) <[hidden email]> Sent: Wednesday, March 4, 2020 10:37 AM To: [hidden email] Subject: EXTERNAL - Teradata as JDBC Connection Caution: Sender is from outside SWA. Take caution before opening links/attachments or replying with sensitive data. If suspicious, forward to '[hidden email]'. Using Flink 1.10 and coding in Java 11, is it possible use to write to Teradata in append mode? MySQL, PostgreSQL, and Derby are the only supported drivers listed. Thanks. https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_table_connect.html-23connectors&d=DwIFAg&c=dyyteaO_66X5RejcGgaVFCWGX8V6S6CQobBcYjo__mc&r=a8BqCmWrJ1FuU14JVrlQLeWdeeSBWSiCJA9Y5xTWafg&m=kfV3arAbKYvpd5IvCtggkHsoDXKTgA1RrGMWrbcWZOo&s=n91D15kGNf9TDtKedGYD8EfDYxnvEzY8POgNtSE-icY&e= I created the ConnectorDescriptor below and am using it from tableEnvironment.connect() but get the exception shown below. public class Teradata extends ConnectorDescriptor { /** * Constructs a {@link ConnectorDescriptor}. */ public Teradata() { super("jdbc", 1, false); } @Override protected Map<String, String> toConnectorProperties() { Map<String, String> map = new HashMap<>(); map.put("Drivername", "com.teradata.jdbc.TeraDriver"); map.put("DBUrl", "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP"); map.put("Username", "..."); map.put("Password", "..."); return map; } } org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all properties. The matching candidates: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory Unsupported property keys: drivername update-mode password dburl username The following properties are requested: connector.property-version=1 connector.type=jdbc dburl=jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP drivername=com.teradata.jdbc.TeraDriver password=xxx schema.0.data-type=VARCHAR(1) schema.0.name=f0 schema.1.data-type=VARCHAR(1) schema.1.name=f1 schema.2.data-type=VARCHAR(1) schema.2.name=f2 update-mode=append username=xxx The following factories have been considered: org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory ******* CONFIDENTIALITY NOTICE ******* This e-mail message and all attachments transmitted with it may contain legally privileged and confidential information intended solely for the use of the addressee. If the reader of this message is not the intended recipient, you are hereby notified that any reading, dissemination, distribution, copying, or other use of this message or its attachments is strictly prohibited. If you have received this message in error, please notify the sender immediately and delete this message from your system. Thank you. ******* CONFIDENTIALITY NOTICE ******* This e-mail message and all attachments transmitted with it may contain legally privileged and confidential information intended solely for the use of the addressee. If the reader of this message is not the intended recipient, you are hereby notified that any reading, dissemination, distribution, copying, or other use of this message or its attachments is strictly prohibited. If you have received this message in error, please notify the sender immediately and delete this message from your system. Thank you. |
Hi Norm, the error message already points to the main issue: your property names are not correct. Unsupported property keys: drivername update-mode password dburl username You should use the builder to properly configure the sink [1]. On Thu, Mar 5, 2020 at 12:58 AM Norm Vilmer (Contractor) <[hidden email]> wrote: Same error with this change: |
Thanks for the reply, Arvid. I changed the property names in my ConnectorDescriptor subclass to match what the validator wanted and now get: “Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all properties. The matching candidates: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory Unsupported property keys: update-mode” The method suggested in the link you sent, registerTableSink, is deprecated in 1.10, so I was trying to use the following: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); . . . tableEnv.connect(new Teradata()) .withSchema(new Schema() .field("f0", DataTypes.VARCHAR(25)) .field("f1", DataTypes.VARCHAR(10240)) .field("f2", DataTypes.VARCHAR(10240))) .inAppendMode() .createTemporaryTable("staging"); Table table = tableEnv.fromDataStream(reportData); table.insertInto("staging"); Using the connect() method, I can see that the code attempts to use the JDBCTableSourceSinkFactory, but does not like ‘update-mode’. Do you have an example using connect() method? Thanks. From: Arvid Heise <[hidden email]>
Hi Norm, the error message already points to the main issue: your property names are not correct. Unsupported property keys: You should use the builder to properly configure the sink [1]. On Thu, Mar 5, 2020 at 12:58 AM Norm Vilmer (Contractor) <[hidden email]> wrote:
******* CONFIDENTIALITY NOTICE *******
This e-mail message and all attachments transmitted with it may contain legally privileged and confidential information intended solely for the use of the addressee. If the reader of this message is not the intended recipient, you are hereby notified that any reading, dissemination, distribution, copying, or other use of this message or its attachments is strictly prohibited. If you have received this message in error, please notify the sender immediately and delete this message from your system. Thank you. |
Hi Norm, Here is a documentation for JDBC connector, you can find the supported properties there: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector Regarding to your exception, you don't need to call `inAppendMode`. JDBC sink support both append-mode and upsert-mode. Besides of that, flink-jdbc currenlty doesn't support teradata dialect and plugin dialect, I guess you have to adjust some code in `JDBCDialects`. Best, Jark On Thu, 5 Mar 2020 at 23:46, Norm Vilmer (Contractor) <[hidden email]> wrote:
|
Thanks Jack. I’ll try removing the inAppendMode()
J Regarding the Teradata dialect, crossing my fingers and hoping insert queries work.
From: Jark Wu <[hidden email]>
Hi Norm, Here is a documentation for JDBC connector, you can find the supported properties there: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector Regarding to your exception, you don't need to call `inAppendMode`. JDBC sink support both append-mode and upsert-mode. Besides of that, flink-jdbc currenlty doesn't support teradata dialect and plugin dialect, I guess you have to adjust some code in `JDBCDialects`. Best, Jark On Thu, 5 Mar 2020 at 23:46, Norm Vilmer (Contractor) <[hidden email]> wrote:
******* CONFIDENTIALITY NOTICE *******
This e-mail message and all attachments transmitted with it may contain legally privileged and confidential information intended solely for the use of the addressee. If the reader of this message is not the intended recipient, you are hereby notified that any reading, dissemination, distribution, copying, or other use of this message or its attachments is strictly prohibited. If you have received this message in error, please notify the sender immediately and delete this message from your system. Thank you. |
Free forum by Nabble | Edit this page |