Does operator uid() have to be unique across all jobs?

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Does operator uid() have to be unique across all jobs?

John Smith
When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?

For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?
Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

Dian Fu
Yes, you can use it in another job. The uid needs only to be unique within a job.

> 在 2019年10月24日,上午5:42,John Smith <[hidden email]> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?

Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

John Smith
Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a job.

> 在 2019年10月24日,上午5:42,John Smith <[hidden email]> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?

Reply | Threaded
Open this post in threaded view
|

RE: Does operator uid() have to be unique across all jobs?

min.tan

Hi,

 

I have some simple questions on the uid as well.

 

1)      Do we add a uid for every operator e.g. print(), addSink and addSource?

2)      For chained operators, do we need to uids for each operator? Or just the last operator?

e.g. .map(....).uid("some-id").print().uid("print-id");

 

 

Regards,

 

Min

 

From: John Smith [mailto:[hidden email]]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");

 

On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 20191024日,上午5:42John Smith <[hidden email]> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

Dian Fu
Hi Min,

The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the map and print operator.


在 2019年10月24日,下午11:22,[hidden email] 写道:

Hi,
 
I have some simple questions on the uid as well.
 
1)      Do we add a uid for every operator e.g. print(), addSink and addSource?
2)      For chained operators, do we need to uids for each operator? Or just the last operator?
e.g. .map(....).uid("some-id").print().uid("print-id");
 
 
Regards,
 
Min
 
From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");
 
On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

>  20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement

Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

Dian Fu
Just adding one more point: Changing the parallelism of the operators may affect the chaining of the operators, which will also affect the generated uid. So the uid of stateful operators should also be set in this case.

在 2019年10月25日,上午9:51,Dian Fu <[hidden email]> 写道:

Hi Min,

The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.

So for the example you give, I think you don't need to specify the uid for the map and print operator.


在 2019年10月24日,下午11:22,[hidden email] 写道:

Hi,
 
I have some simple questions on the uid as well.
 
1)      Do we add a uid for every operator e.g. print(), addSink and addSource?
2)      For chained operators, do we need to uids for each operator? Or just the last operator?
e.g. .map(....).uid("some-id").print().uid("print-id");
 
 
Regards,
 
Min
 
From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");
 
On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

>  20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement


Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

Victor Wong
In reply to this post by min.tan

Hi,

  “uid” is mainly useful when you upgrade your application. It’s used to match the operator state stored in the savepoint.

  As suggested in [1], “it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future.”

 

  [1]. https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state

 

From: "[hidden email]" <[hidden email]>
Date: Thursday, 24 October 2019 at 11:31 PM
To: "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: RE: Does operator uid() have to be unique across all jobs?

 

Hi,

 

I have some simple questions on the uid as well.

 

  1. Do we add a uid for every operator e.g. print(), addSink and addSource?
  2. For chained operators, do we need to uids for each operator? Or just the last operator?

e.g. .map(....).uid("some-id").print().uid("print-id");

 

 

Regards,

 

Min

 

From: John Smith [mailto:[hidden email]]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");

 

On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 20191024日,上午5:42John Smith <[hidden email]> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?

Reply | Threaded
Open this post in threaded view
|

RE: Does operator uid() have to be unique across all jobs?

min.tan
In reply to this post by Dian Fu

Thank you very much for your helpful response.

 

Our new production release complains about the an uid mismatch (we use exactly once checkpoints).

I hope I understand  your correctly: map and print are certainly stateless, therefore no uid is required. What about addSink and addSoure? Do they need an uid? Or a name() has a similar function?

 

Regards,

 

Min

 

From: Dian Fu [mailto:[hidden email]]
Sent: Freitag, 25.
Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Hi Min,

 

The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.

1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.

2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.

 

So for the example you give, I think you don't need to specify the uid for the map and print operator.

 

 

20191024日,下午11:22[hidden email] 写道:

 

Hi,

 

I have some simple questions on the uid as well.

 

1)      Do we add a uid for every operator e.g. print(), addSink and addSource?

2)      For chained operators, do we need to uids for each operator? Or just the last operator?

e.g. .map(....).uid("some-id").print().uid("print-id");

 

 

Regards,

 

Min

 

From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");

 

On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 
 20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement

 



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

Dian Fu
Hi Min,

It depends on the source/sink implementation. If the source/sink implementation uses state, uid should be set. So you can always set the uid in this case and then you don't need to care about the implementation details of the source/sink you used.

name() doesn't have such functionality.

Regarding to the uid mismatch you encountered, could you share the exception log? 

Regards,
Dian

在 2019年10月25日,下午3:38,[hidden email] 写道:

Thank you very much for your helpful response.
 
Our new production release complains about the an uid mismatch (we use exactly once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, therefore no uid is required. What about addSink and addSoure? Do they need an uid? Or a name() has a similar function?
 
Regards,
 
Min
 
From: Dian Fu [[hidden email]] 
Sent: Freitag, 25. 
Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Hi Min,
 
The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.
 
So for the example you give, I think you don't need to specify the uid for the map and print operator.
 
 
 20191024日,下午11:22[hidden email] 写道:
 
Hi,
 
I have some simple questions on the uid as well.
 
1)      Do we add a uid for every operator e.g. print(), addSink and addSource?
2)      For chained operators, do we need to uids for each operator? Or just the last operator?
e.g. .map(....).uid("some-id").print().uid("print-id");
 
 
Regards,
 
Min
 
From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");
 
On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 
 20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement
 

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement

Reply | Threaded
Open this post in threaded view
|

RE: Does operator uid() have to be unique across all jobs?

min.tan

Thanks for your reply.

 

Our sources and sinks are connected to Kafka, therefore they are statful.

 

We did not set uid on them but only name().

 

The log says

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <a href="file:///\\var\flink\data-remote\savepoint-000000-dae014102550">file:/var/flink/data-remote/savepoint-000000-dae014102550. Cannot map checkpoint/savepoint state for operator 484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

 

Regards,

 

Min

 

 

From: Dian Fu [mailto:[hidden email]]
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Hi Min,

 

It depends on the source/sink implementation. If the source/sink implementation uses state, uid should be set. So you can always set the uid in this case and then you don't need to care about the implementation details of the source/sink you used.

 

name() doesn't have such functionality.

 

Regarding to the uid mismatch you encountered, could you share the exception log? 

 

Regards,

Dian

 

20191025日,下午3:38[hidden email] 写道:

 

Thank you very much for your helpful response.

 

Our new production release complains about the an uid mismatch (we use exactly once checkpoints).

I hope I understand  your correctly: map and print are certainly stateless, therefore no uid is required. What about addSink and addSoure? Do they need an uid? Or a name() has a similar function?

 

Regards,

 

Min

 

From: Dian Fu [[hidden email]] 
Sent: Freitag, 25. 
Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Hi Min,

 

The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.

1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.

2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.

 

So for the example you give, I think you don't need to specify the uid for the map and print operator.

 

 

 20191024日,下午11:22[hidden email] 写道:

 

Hi,

 

I have some simple questions on the uid as well.

 

1)      Do we add a uid for every operator e.g. print(), addSink and addSource?

2)      For chained operators, do we need to uids for each operator? Or just the last operator?

e.g. .map(....).uid("some-id").print().uid("print-id");

 

 

Regards,

 

Min

 

From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");

 

On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 
 20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement

 


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement

 



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement
Reply | Threaded
Open this post in threaded view
|

Re: Does operator uid() have to be unique across all jobs?

Dian Fu
It means that there is an operator state which has no corresponding operator in the new job. It usually indicates that the uid of a stateful operator has changed.

在 2019年10月25日,下午4:12,<[hidden email]> <[hidden email]> 写道:

Thanks for your reply.
 
Our sources and sinks are connected to Kafka, therefore they are statful.
 
We did not set uid on them but only name().
 
The log says
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/var/flink/data-remote/savepoint-000000-dae014102550. Cannot map checkpoint/savepoint state for operator 484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
 
Regards,
 
Min
 
 
From: Dian Fu [[hidden email]] 
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Hi Min,
 
It depends on the source/sink implementation. If the source/sink implementation uses state, uid should be set. So you can always set the uid in this case and then you don't need to care about the implementation details of the source/sink you used.
 
name() doesn't have such functionality.
 
Regarding to the uid mismatch you encountered, could you share the exception log? 
 
Regards,
Dian
 
 20191025日,下午3:38[hidden email] 写道:
 
Thank you very much for your helpful response.
 
Our new production release complains about the an uid mismatch (we use exactly once checkpoints).
I hope I understand  your correctly: map and print are certainly stateless, therefore no uid is required. What about addSink and addSoure? Do they need an uid? Or a name() has a similar function?
 
Regards,
 
Min
 
From: Dian Fu [[hidden email]] 
Sent: Freitag, 25. 
Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Hi Min,
 
The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.
1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.
2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.
 
So for the example you give, I think you don't need to specify the uid for the map and print operator.
 
 
 20191024日,下午11:22[hidden email] 写道:
 
Hi,
 
I have some simple questions on the uid as well.
 
1)      Do we add a uid for every operator e.g. print(), addSink and addSource?
2)      For chained operators, do we need to uids for each operator? Or just the last operator?
e.g. .map(....).uid("some-id").print().uid("print-id");
 
 
Regards,
 
Min
 
From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?
 
Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");
 
On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 
 20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement
 

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement
 

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement

Reply | Threaded
Open this post in threaded view
|

RE: Does operator uid() have to be unique across all jobs?

min.tan

Thank you for your reply.

 

Any tool enables us to inspect (list) statically all the "uid"ed operators or all the operators? for a jar?  

 

Also addSink and addSource are not on the operator list https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

 

But they both have an uid method. Are these two an operator or not?

 

Regards,

 

Min

 

 

From: Dian Fu [mailto:[hidden email]]
Sent: Freitag, 25. Oktober 2019 10:24
To: Tan, Min
Cc: John Smith; [hidden email]
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

It means that there is an operator state which has no corresponding operator in the new job. It usually indicates that the uid of a stateful operator has changed.

 

20191025日,下午4:12<[hidden email]> <[hidden email]> 写道:

 

Thanks for your reply.

 

Our sources and sinks are connected to Kafka, therefore they are statful.

 

We did not set uid on them but only name().

 

The log says

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <a href="file:///\\var\flink\data-remote\savepoint-000000-dae014102550">file:/var/flink/data-remote/savepoint-000000-dae014102550. Cannot map checkpoint/savepoint state for operator 484df1f961bd0cff95fd39b290ba9c03 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

 

Regards,

 

Min

 

 

From: Dian Fu [[hidden email]] 
Sent: Freitag, 25. Oktober 2019 10:04
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Hi Min,

 

It depends on the source/sink implementation. If the source/sink implementation uses state, uid should be set. So you can always set the uid in this case and then you don't need to care about the implementation details of the source/sink you used.

 

name() doesn't have such functionality.

 

Regarding to the uid mismatch you encountered, could you share the exception log? 

 

Regards,

Dian

 

 20191025日,下午3:38[hidden email] 写道:

 

Thank you very much for your helpful response.

 

Our new production release complains about the an uid mismatch (we use exactly once checkpoints).

I hope I understand  your correctly: map and print are certainly stateless, therefore no uid is required. What about addSink and addSoure? Do they need an uid? Or a name() has a similar function?

 

Regards,

 

Min

 

From: Dian Fu [[hidden email]] 
Sent: Freitag, 25. 
Oktober 2019 03:52
To: Tan, Min
Cc: John Smith; user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Hi Min,

 

The uid is used to matching the operator state stored in the checkpoint/savepoint to an operator[1]. So you only need to specify the uid for stateful operators.

1) If you have not specified the uid for an operator, it will generate a uid for it in a deterministic way[2] for it. The generated uid doesn't change for the same job.

2) However, it's encouraged to set uid for stateful operators to allow for job evolution. The dynamically generated uid is not guaranteed to remain the same if the job has changed, i.e. adding/removing operators in the job graph. If you want to reuse state after job evolution, you need to set the uid explicitly.

 

So for the example you give, I think you don't need to specify the uid for the map and print operator.

 

 

 20191024日,下午11:22[hidden email] 写道:

 

Hi,

 

I have some simple questions on the uid as well.

 

1)      Do we add a uid for every operator e.g. print(), addSink and addSource?

2)      For chained operators, do we need to uids for each operator? Or just the last operator?

e.g. .map(....).uid("some-id").print().uid("print-id");

 

 

Regards,

 

Min

 

From: John Smith [[hidden email]] 
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

 

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map(....).uid("some-id").name("some-id");

 

On Wed, 23 Oct 2019 at 21:13, Dian Fu <[hidden email]> wrote:

Yes, you can use it in another job. The uid needs only to be unique within a job.

> 
 20191024日,上午5:42John Smith <[hidden email]> 写道:
> 
> When setting uid() of an operator does it have to be unique across all jobs or just unique within a job?
> 
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in another job?


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement

 


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement

 


E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice 
http://www.ubs.com/privacy-statement

 



E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement