Is it possible to change 'connector.startup-mode' option in the flink job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Is it possible to change 'connector.startup-mode' option in the flink job

Thomas Huang
Hi guys,

I'm using hive to store kafka topic metadata as follows::

CREATE TABLE orders (
	user_id    BIGINT,
	product    STRING,
	order_time TIMESTAMP(3),
	WATERMARK FOR order_time AS order_time - '5' SECONDS
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'orders',
	'connector.startup-mode' = 'earliest-offset',
	'connector.properties.bootstrap.servers' = 'localhost:9092',
	'format.type' = 'json' 
);
However, sometimes for incident handle, I have to set 'connector.startup-mode' to a specific timestamp, but I don't want to change the option setting in the hive table. Is there any way (api or ddl) to change  'connector.startup-mode'  option in the flink jobs, but not impact the option stores in hive?


Best Wishes.
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to change 'connector.startup-mode' option in the flink job

Jingsong Li
Hi Thomas,

Good to hear from you. This is a very common problem.
In 1.11, we have two FLIP to solve your problem. [1][2] You can take a look.
I think dynamic table options (table hints) is enough for your requirement.

On Tue, May 19, 2020 at 10:37 AM Thomas Huang <[hidden email]> wrote:
Hi guys,

I'm using hive to store kafka topic metadata as follows::

CREATE TABLE orders (
	user_id    BIGINT,
	product    STRING,
	order_time TIMESTAMP(3),
	WATERMARK FOR order_time AS order_time - '5' SECONDS
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'orders',
	'connector.startup-mode' = 'earliest-offset',
	'connector.properties.bootstrap.servers' = 'localhost:9092',
	'format.type' = 'json' 
);
However, sometimes for incident handle, I have to set 'connector.startup-mode' to a specific timestamp, but I don't want to change the option setting in the hive table. Is there any way (api or ddl) to change  'connector.startup-mode'  option in the flink jobs, but not impact the option stores in hive?


Best Wishes.


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to change 'connector.startup-mode' option in the flink job

Thomas Huang
Hi Jingsong,

Cool, Thanks for your reply.


Best wishes.

From: Jingsong Li <[hidden email]>
Sent: Tuesday, May 19, 2020 10:46
To: Thomas Huang <[hidden email]>
Cc: Flink <[hidden email]>
Subject: Re: Is it possible to change 'connector.startup-mode' option in the flink job
 
Hi Thomas,

Good to hear from you. This is a very common problem.
In 1.11, we have two FLIP to solve your problem. [1][2] You can take a look.
I think dynamic table options (table hints) is enough for your requirement.

[1]<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A&#43;Supports&#43;Dynamic&#43;Table&#43;Options&#43;for&#43;Flink&#43;SQL">https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
[2]<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A&#43;Support&#43;LIKE&#43;clause&#43;in&#43;CREATE&#43;TABLE">https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A+Support+LIKE+clause+in+CREATE+TABLE

Best,
Jingsong Lee

On Tue, May 19, 2020 at 10:37 AM Thomas Huang <[hidden email]> wrote:
Hi guys,

I'm using hive to store kafka topic metadata as follows::

CREATE TABLE orders (
	user_id    BIGINT,
	product    STRING,
	order_time TIMESTAMP(3),
	WATERMARK FOR order_time AS order_time - '5' SECONDS
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'orders',
	'connector.startup-mode' = 'earliest-offset',
	'connector.properties.bootstrap.servers' = 'localhost:9092',
	'format.type' = 'json' 
);
However, sometimes for incident handle, I have to set 'connector.startup-mode' to a specific timestamp, but I don't want to change the option setting in the hive table. Is there any way (api or ddl) to change  'connector.startup-mode'  option in the flink jobs, but not impact the option stores in hive?


Best Wishes.


--
Best, Jingsong Lee