Filtering lines in parquet

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Filtering lines in parquet

Avi Levi-3
Hi all,
I am trying to filter lines from parquet files, the problem is that they have different schemas, however the field that I am using to filter exists in all schemas. 
in spark this is quite straight forward :

val filtered = rawsDF.filter(col("id") != "123")

I tried to do it in flink by extending the ParquetInputFormat but in this case I need to schema (message type) and implement Convert method which I want to avoid since I do not want to convert the line (I want to write is as is to other parquet file) 

Any ideas ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Filtering lines in parquet

Arvid Heise-4
Hi Avi,

I'm not entirely sure I understand the question. Let's say you have source A, B, C all with different schema but all have an id. You could use the ParquetMapInputFormat that provides a map of the records and just use a map-lookup.

However, I'm not sure how you want to write these records with different schema into the same parquet file. Maybe, you just want to extract the common fields of A, B, C? Then you can also use Table API and just declare the fields that are common.

Or do you have sink A, B, C and actually 3 separate topologies?

On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <[hidden email]> wrote:
Hi all,
I am trying to filter lines from parquet files, the problem is that they have different schemas, however the field that I am using to filter exists in all schemas. 
in spark this is quite straight forward :

val filtered = rawsDF.filter(col("id") != "123")

I tried to do it in flink by extending the ParquetInputFormat but in this case I need to schema (message type) and implement Convert method which I want to avoid since I do not want to convert the line (I want to write is as is to other parquet file) 

Any ideas ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Filtering lines in parquet

Avi Levi-3
Hi Arvid,
assuming that I have A0,B0,C0 parquet files with different schema and a common field ID, I want to write them to A1,B2,C3 files respectively. My problem is that in my code I do not want to know the full schema just by filtering using the ID field and writing the unfiltered lines to the destination file. each source file should have a matching destination file 
I tried to implement it using the ParquetInputFormat but I need to define the schema in advance (MessageType) . 
class ParquetInput(path: Path,  messageType: MessageType) extends ParquetInputFormat[Row](path, messageType){
I am looking for a way that my code will be agnostic to the schema and will only know the "ID" field (just like in spark) e.g val filtered = rawsDF.filter(col("id") != "123")

Thanks
Avi

On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise <[hidden email]> wrote:
Hi Avi,

I'm not entirely sure I understand the question. Let's say you have source A, B, C all with different schema but all have an id. You could use the ParquetMapInputFormat that provides a map of the records and just use a map-lookup.

However, I'm not sure how you want to write these records with different schema into the same parquet file. Maybe, you just want to extract the common fields of A, B, C? Then you can also use Table API and just declare the fields that are common.

Or do you have sink A, B, C and actually 3 separate topologies?

On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <[hidden email]> wrote:
Hi all,
I am trying to filter lines from parquet files, the problem is that they have different schemas, however the field that I am using to filter exists in all schemas. 
in spark this is quite straight forward :

val filtered = rawsDF.filter(col("id") != "123")

I tried to do it in flink by extending the ParquetInputFormat but in this case I need to schema (message type) and implement Convert method which I want to avoid since I do not want to convert the line (I want to write is as is to other parquet file) 

Any ideas ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Filtering lines in parquet

Arvid Heise-4
Hi Avi,

thanks for clarifying.

It seems like it's not possible to parse Parquet in Flink without knowing the schema. What i'd do is to parse the metadata while setting up the job and then pass it to the input format:
ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path, fileSize);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
Quite possibly that's what Spark is doing under hood. If you open a ticket with a feature request, we will add it in the future.

On Thu, Mar 11, 2021 at 6:26 PM Avi Levi <[hidden email]> wrote:
Hi Arvid,
assuming that I have A0,B0,C0 parquet files with different schema and a common field ID, I want to write them to A1,B2,C3 files respectively. My problem is that in my code I do not want to know the full schema just by filtering using the ID field and writing the unfiltered lines to the destination file. each source file should have a matching destination file 
I tried to implement it using the ParquetInputFormat but I need to define the schema in advance (MessageType) . 
class ParquetInput(path: Path,  messageType: MessageType) extends ParquetInputFormat[Row](path, messageType){
I am looking for a way that my code will be agnostic to the schema and will only know the "ID" field (just like in spark) e.g val filtered = rawsDF.filter(col("id") != "123")

Thanks
Avi

On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise <[hidden email]> wrote:
Hi Avi,

I'm not entirely sure I understand the question. Let's say you have source A, B, C all with different schema but all have an id. You could use the ParquetMapInputFormat that provides a map of the records and just use a map-lookup.

However, I'm not sure how you want to write these records with different schema into the same parquet file. Maybe, you just want to extract the common fields of A, B, C? Then you can also use Table API and just declare the fields that are common.

Or do you have sink A, B, C and actually 3 separate topologies?

On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <[hidden email]> wrote:
Hi all,
I am trying to filter lines from parquet files, the problem is that they have different schemas, however the field that I am using to filter exists in all schemas. 
in spark this is quite straight forward :

val filtered = rawsDF.filter(col("id") != "123")

I tried to do it in flink by extending the ParquetInputFormat but in this case I need to schema (message type) and implement Convert method which I want to avoid since I do not want to convert the line (I want to write is as is to other parquet file) 

Any ideas ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Filtering lines in parquet

Avi Levi-3
Cool, thanks! 

On Fri, Mar 12, 2021, 13:15 Arvid Heise <[hidden email]> wrote:
Hi Avi,

thanks for clarifying.

It seems like it's not possible to parse Parquet in Flink without knowing the schema. What i'd do is to parse the metadata while setting up the job and then pass it to the input format:
ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path, fileSize);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
Quite possibly that's what Spark is doing under hood. If you open a ticket with a feature request, we will add it in the future.

On Thu, Mar 11, 2021 at 6:26 PM Avi Levi <[hidden email]> wrote:
Hi Arvid,
assuming that I have A0,B0,C0 parquet files with different schema and a common field ID, I want to write them to A1,B2,C3 files respectively. My problem is that in my code I do not want to know the full schema just by filtering using the ID field and writing the unfiltered lines to the destination file. each source file should have a matching destination file 
I tried to implement it using the ParquetInputFormat but I need to define the schema in advance (MessageType) . 
class ParquetInput(path: Path,  messageType: MessageType) extends ParquetInputFormat[Row](path, messageType){
I am looking for a way that my code will be agnostic to the schema and will only know the "ID" field (just like in spark) e.g val filtered = rawsDF.filter(col("id") != "123")

Thanks
Avi

On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise <[hidden email]> wrote:
Hi Avi,

I'm not entirely sure I understand the question. Let's say you have source A, B, C all with different schema but all have an id. You could use the ParquetMapInputFormat that provides a map of the records and just use a map-lookup.

However, I'm not sure how you want to write these records with different schema into the same parquet file. Maybe, you just want to extract the common fields of A, B, C? Then you can also use Table API and just declare the fields that are common.

Or do you have sink A, B, C and actually 3 separate topologies?

On Wed, Mar 10, 2021 at 10:50 AM Avi Levi <[hidden email]> wrote:
Hi all,
I am trying to filter lines from parquet files, the problem is that they have different schemas, however the field that I am using to filter exists in all schemas. 
in spark this is quite straight forward :

val filtered = rawsDF.filter(col("id") != "123")

I tried to do it in flink by extending the ParquetInputFormat but in this case I need to schema (message type) and implement Convert method which I want to avoid since I do not want to convert the line (I want to write is as is to other parquet file) 

Any ideas ?

Cheers
Avi