Keeping only latest row by key?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Keeping only latest row by key?

Porritt, James

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

 

                        .withColumn("rn",

                            F.row_number().over(

                                Window.partitionBy(‘id’) \

                                    .orderBy(F.col('timestamp').desc())

                            )

                        ) \

                        .where(F.col("rn") == 1)

 

I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?

 

Thanks,

James.

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

Reply | Threaded
Open this post in threaded view
|

Re: Keeping only latest row by key?

Andrey Zagrebin
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey

On 17 Jul 2018, at 18:27, Porritt, James <[hidden email]> wrote:

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:
 
                        .withColumn("rn",
                            F.row_number().over(
                                Window.partitionBy(‘id’) \
                                    .orderBy(F.col('timestamp').desc())
                            )
                        ) \
                        .where(F.col("rn") == 1)
 
I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?
 
Thanks,
James.
######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

Reply | Threaded
Open this post in threaded view
|

Re: Keeping only latest row by key?

Timo Walther
Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey

On 17 Jul 2018, at 18:27, Porritt, James <[hidden email]> wrote:

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:
 
                        .withColumn("rn",
                            F.row_number().over(
                                Window.partitionBy(‘id’) \
                                    .orderBy(F.col('timestamp').desc())
                            )
                        ) \
                        .where(F.col("rn") == 1)
 
I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?
 
Thanks,
James.
######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################


Reply | Threaded
Open this post in threaded view
|

RE: Keeping only latest row by key?

Porritt, James

Hi Timo,

                Thanks for this. I’ve been looking into creating this in Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be creating a version for each type I want to use it with (albeit using Generic s) and registering the functions separately for use with the correct type of table field?

 

Thanks,

James.

 

From: Timo Walther <[hidden email]>
Sent: 18 July 2018 12:21
To: Porritt, James <[hidden email]>
Cc: [hidden email]
Subject: Re: Keeping only latest row by key?

 

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:

Hi James,

 

There are over windows in Flink Table API:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows

It should be possible to implement this behaviour using them.

 

Cheers,

Andrey



On 17 Jul 2018, at 18:27, Porritt, James <[hidden email]> wrote:

 

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

 

                        .withColumn("rn",

                            F.row_number().over(

                                Window.partitionBy(‘id’) \

                                    .orderBy(F.col('timestamp').desc())

                            )

                        ) \

                        .where(F.col("rn") == 1)

 

I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?

 

Thanks,

James.

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

 

 

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

Reply | Threaded
Open this post in threaded view
|

RE: Keeping only latest row by key?

Porritt, James

It looks like the following gives me the result I’m interested in:

 

batchEnv

                .createInput(dataset)

                .groupBy("id")

                .sortGroup("timestamp", Order.DESCENDING)

                .first(1);

 

Is there anything I’ve misunderstood with this?

 

From: Porritt, James <[hidden email]>
Sent: 19 July 2018 09:21
To: 'Timo Walther' <[hidden email]>
Cc: [hidden email]
Subject: RE: Keeping only latest row by key?

 

Hi Timo,

                Thanks for this. I’ve been looking into creating this in Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be creating a version for each type I want to use it with (albeit using Generic s) and registering the functions separately for use with the correct type of table field?

 

Thanks,

James.

 

From: Timo Walther <[hidden email]>
Sent: 18 July 2018 12:21
To: Porritt, James <
[hidden email]>
Cc:
[hidden email]
Subject: Re: Keeping only latest row by key?

 

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:

Hi James,

 

There are over windows in Flink Table API:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows

It should be possible to implement this behaviour using them.

 

Cheers,

Andrey

 

On 17 Jul 2018, at 18:27, Porritt, James <[hidden email]> wrote:

 

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

 

                        .withColumn("rn",

                            F.row_number().over(

                                Window.partitionBy(‘id’) \

                                    .orderBy(F.col('timestamp').desc())

                            )

                        ) \

                        .where(F.col("rn") == 1)

 

I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?

 

Thanks,

James.

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

 

 

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

 

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

Reply | Threaded
Open this post in threaded view
|

Re: Keeping only latest row by key?

Fabian Hueske-2
HI James,

Yes, that should also do the trick.

Best, Fabian

2018-07-19 16:06 GMT+02:00 Porritt, James <[hidden email]>:

It looks like the following gives me the result I’m interested in:

 

batchEnv

                .createInput(dataset)

                .groupBy("id")

                .sortGroup("timestamp", Order.DESCENDING)

                .first(1);

 

Is there anything I’ve misunderstood with this?

 

From: Porritt, James <[hidden email]>
Sent: 19 July 2018 09:21
To: 'Timo Walther' <[hidden email]>
Cc: [hidden email]
Subject: RE: Keeping only latest row by key?

 

Hi Timo,

                Thanks for this. I’ve been looking into creating this in Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be creating a version for each type I want to use it with (albeit using Generic s) and registering the functions separately for use with the correct type of table field?

 

Thanks,

James.

 

From: Timo Walther <[hidden email]>
Sent: 18 July 2018 12:21
To: Porritt, James <
[hidden email]>
Cc:
[hidden email]
Subject: Re: Keeping only latest row by key?

 

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:

Hi James,

 

There are over windows in Flink Table API:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows

It should be possible to implement this behaviour using them.

 

Cheers,

Andrey

 

On 17 Jul 2018, at 18:27, Porritt, James <[hidden email]> wrote:

 

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

 

                        .withColumn("rn",

                            F.row_number().over(

                                Window.partitionBy(‘id’) \

                                    .orderBy(F.col('timestamp').desc())

                            )

                        ) \

                        .where(F.col("rn") == 1)

 

I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?

 

Thanks,

James.

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

 

 

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

 

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################