Hi, using 1.8.0 I have the following job: https://pastebin.com/ibZUE8Qx So the job does the following steps... 1- Consume from Kafka and return JsonObject 2- Map JsonObject to MyPojo 3- Convert The stream to a table 4- Insert the table to JDBC sink table 5- Print the table. - The job seems to work with no errors and I can see the row print to the console and I see nothing in my database. - If I put invalid host for the database and restart the job, I get a connection SQLException error. So at least we know that works. - If I make a typo on the INSERT INTO statement like INSERTS INTO non_existing_table, there are no exceptions thrown, the print happens, the stream continues to work. - If I drop the table from the database, same thing, no exceptions thrown, the print happens, the stream continues to work. So am I missing something? |
Ok I think I found it. it's the batch interval setting. From what I see, if we want "realtime" stream to the database we have to set it to 1 other wise the sink will wait until, the batch interval count is reached. The batch interval mechanism doesn't see correct? If the default size is 5000 and you need to insert 5001 you will never get that 1 record? On Tue, 15 Oct 2019 at 15:54, John Smith <[hidden email]> wrote:
|
Hi John, You are right. IMO the batch interval setting is used for increasing the JDBC execution performance purpose. The reason why your INSERT INTO statement with a `non_existing_table` the exception doesn't happen is because the JDBCAppendableSink does not check table existence beforehand. That being said it should fail at the first batch execution. Also I think the `batchInterval` setting is local to the task , this means the default 5000 batchInterval is per-partition. -- Rong On Wed, Oct 16, 2019 at 7:21 AM John Smith <[hidden email]> wrote:
|
Yes correct, I set it to batch interval = 1 and it works fine. Anyways I think the JDBC sink could have some improvements like batchInterval + time interval execution. So if the batch doesn't fill up then execute what ever is left on that time interval. On Thu, 17 Oct 2019 at 12:22, Rong Rong <[hidden email]> wrote:
|
Yes, I think having a time interval execution (for the AppendableSink) should be a good idea. Can you please open a Jira issue[1] for further discussion. -- Rong On Thu, Oct 17, 2019 at 9:48 AM John Smith <[hidden email]> wrote:
|
I recorded two: Time interval: https://issues.apache.org/jira/browse/FLINK-14442Checkpointing: https://issues.apache.org/jira/browse/FLINK-14443 On Thu, 17 Oct 2019 at 14:00, Rong Rong <[hidden email]> wrote:
|
Splendid. Thanks for following up and moving the discussion forward :-) -- Rong On Thu, Oct 17, 2019 at 11:38 AM John Smith <[hidden email]> wrote:
|
So it looks like newer versions have this. The 1.8 branch you can have similar functionality if you enable checkpointing. There's a few things to look at that could be confusing if using 1.8. JDBCOuputFormat: Only works with batch size interval and works with Row. JDBCSinkFunction: Uses JDBCOuputFormat but will also sink on a time interval based on the checkpoint settings. AppendTableSink: Uses JDBCSinkFunction and works with table APIs. On Thu, 17 Oct 2019 at 23:57, Rong Rong <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |