PyFlink LIST type problem

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink LIST type problem

László Ciople
Hello,
While trying to use the Pyflink DataStream API in Flink 1.13, I have encountered an error regarding list types. I am trying to read data from a Kafka topic that contains events in a json format. For example:
{
    "timestamp"1614259940,
    "harvesterID""aws-harvester",
    "clientID""aws-client-id",
    "deviceID""aws-devid",
    "payload": {
        "Version": {
            "PolicyVersion": {
                "Document": {
                    "Version""2012-10-17",
                    "Statement": [
                        {
                            "Action""ec2:*",
                            "Effect""Allow",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""elasticloadbalancing:*",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""cloudwatch:*",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""autoscaling:*",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""iam:CreateServiceLinkedRole",
                            "Resource""*",
                            "Condition": {
                                "StringEquals": {
                                    "iam:AWSServiceName": [
                                        "autoscaling.amazonaws.com",
                                        "ec2scheduled.amazonaws.com",
                                        "elasticloadbalancing.amazonaws.com",
                                        "spot.amazonaws.com",
                                        "spotfleet.amazonaws.com",
                                        "transitgateway.amazonaws.com"
                                    ]
                                }
                            }
                        }
                    ]
                },
                "VersionId""v5",
                "IsDefaultVersion"true,
                "CreateDate""2018-11-27 02:16:56+00:00"
            },
            "ResponseMetadata": {
                "RequestId""6d32c946-1273-4bc5-b465-e5549dc4f515",
                "HTTPStatusCode"200,
                "HTTPHeaders": {
                    "x-amzn-requestid""6d32c946-1273-4bc5-b465-e5549dc4f515",
                    "content-type""text/xml",
                    "content-length""2312",
                    "vary""accept-encoding",
                    "date""Thu, 25 Feb 2021 15:32:18 GMT"
                },
                "RetryAttempts"0
            }
        },
        "Policy": {
            "Policy": {
                "PolicyName""AmazonEC2FullAccess",
                "PolicyId""ANPAI3VAJF5ZCRZ7MCQE6",
                "Arn""arn:aws:iam::aws:policy/AmazonEC2FullAccess",
                "Path""/",
                "DefaultVersionId""v5",
                "AttachmentCount"2,
                "PermissionsBoundaryUsageCount"0,
                "IsAttachable"true,
                "Description""Provides full access to Amazon EC2 via the AWS Management Console.",
                "CreateDate""2015-02-06 18:40:15+00:00",
                "UpdateDate""2018-11-27 02:16:56+00:00"
            },
            "ResponseMetadata": {
                "RequestId""a7e9f175-a757-4215-851e-f3d001083631",
                "HTTPStatusCode"200,
                "HTTPHeaders": {
                    "x-amzn-requestid""a7e9f175-a757-4215-851e-f3d001083631",
                    "content-type""text/xml",
                    "content-length""866",
                    "date""Thu, 25 Feb 2021 15:32:18 GMT"
                },
                "RetryAttempts"0
            }
        }
    }
}

I have tried to map this json to Flink data types as follows:
input_type = Types.ROW_NAMED(
        ['timestamp''harvesterID''clientID''deviceID''payload'],
        [
            Types.LONG(),  # timestamp
            Types.STRING(),  # harvesterID
            Types.STRING(),  # clientID
            Types.STRING(),  # deviceID
            Types.ROW_NAMED(  # Payload
                ['Version''Policy'],
                [
                    Types.ROW_NAMED(  # Version
                        ['PolicyVersion''ResponseMetadata'],
                        [
                            Types.ROW_NAMED(  # PolicyVersion
                                ['Document''VersionId''IsDefaultVersion''CreateDate'],
                                [
                                    Types.ROW_NAMED(  # Document
                                        ['Version''Statement'],
                                        [
                                            Types.STRING(),  # Version
                                            Types.LIST(  # Statement
                                                Types.ROW_NAMED(
                                                    ['Action''Effect''Resource''Condition'],
                                                    [
                                                        Types.STRING(),  # Action
                                                        Types.STRING(),  # Effect
                                                        Types.STRING(),  # Resource
                                                        Types.ROW_NAMED(  # Condition
                                                            ['StringEquals'],
                                                            [
                                                                Types.ROW_NAMED(  # StringEquals
                                                                    ['iam:AWSServiceName'],
                                                                    [
                                                                        Types.LIST(Types.STRING())  # iam:AWSServiceName
                                                                    ])
                                                            ])
                                                    ])
                                            )
                                        ]),
                                    Types.STRING(),  # VersionId
                                    Types.BOOLEAN(),  # IsDefaultVersion
                                    Types.STRING()  # CreateDate
                                ]),
                            Types.ROW_NAMED(
                                ['RequestId''HTTPStatusCode''HTTPHeaders''RetryAttempts'],
                                [
                                    Types.STRING(),  # RequestId
                                    Types.INT(),  # HTTPStatusCode
                                    Types.MAP(  # HTTPHeaders
                                        Types.STRING(),
                                        Types.STRING()
                                    ),
                                    Types.INT()  # RetryAttempts
                                ])
                        ]),
                    Types.ROW_NAMED(  # Policy
                        ['Policy''ResponseMetadata'],
                        [
                            Types.ROW_NAMED(  # Policy
                                ['PolicyName''PolicyId''Arn''Path''DefaultVersionId''AttachmentCount',
                                    'PermissionBoundaryUsageCount''IsAttachable''Description''CreateDate''UpdateDate'],
                                [
                                    Types.STRING(),  # PolicyName
                                    Types.STRING(),  # PolicyId
                                    Types.STRING(),  # Arn
                                    Types.STRING(),  # Path
                                    Types.STRING(),  # DefaultVersionId
                                    Types.INT(),  # AttachmentCount
                                    Types.INT(),  # PermissionBoundaryUsageCount
                                    Types.BOOLEAN(),  # IsAttachable
                                    Types.STRING(),  # Description
                                    Types.STRING(),  # CreateDate
                                    Types.STRING()  # UpdateDate
                                ]),
                            Types.ROW_NAMED(  # ResponseMetadata
                                ['RequestId''HTTPStatusCode''HTTPHeaders''RetryAttempts'],
                                [
                                    Types.STRING(),  # RequestId
                                    Types.INT(),  # HTTPStatusCode
                                    Types.MAP(  # HTTPHeaders
                                        Types.STRING(),
                                        Types.STRING()
                                    ),
                                    Types.INT()  # RetryAttempts
                                ])
                        ])
                ])
        ])

But when I try to run submit the application to Flink, I receive the following error, when it tries to read data from the Kafka topic:
java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to org.apache.flink.types.Row
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:102)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)

I think the problem is caused by the Statement list in the event type format (I have been counting the number of RowSerializer calls on the exception stack and it seems that the ListSerializer is called for the Statement field of the Document row). Is there a bug in Flink, or am I not using the LIST type correctly? I really need to have a list in which the element type is composite and not a primitive type. (I have also tried to use the BASIC_ARRAY and PRIMITIVE_ARRAY types, but with those the job fails even before it is submitted, which I expected would happen)

Best regards,
Laszlo
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink LIST type problem

Dian Fu
Hi Laszlo,

It seems because the json format supports object array type and doesn’t support list type. 

However, it still hasn’t provided object array type in PyFlink Datastream API [1]. I have created a ticket as a following up.

For now, I guess you could implement it yourself and could take a look at the basic array [2] as an example.

Regards,
Dian


2021年6月17日 下午5:01,László Ciople <[hidden email]> 写道:

Hello,
While trying to use the Pyflink DataStream API in Flink 1.13, I have encountered an error regarding list types. I am trying to read data from a Kafka topic that contains events in a json format. For example:
{
    "timestamp"1614259940,
    "harvesterID""aws-harvester",
    "clientID""aws-client-id",
    "deviceID""aws-devid",
    "payload": {
        "Version": {
            "PolicyVersion": {
                "Document": {
                    "Version""2012-10-17",
                    "Statement": [
                        {
                            "Action""ec2:*",
                            "Effect""Allow",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""elasticloadbalancing:*",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""cloudwatch:*",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""autoscaling:*",
                            "Resource""*"
                        },
                        {
                            "Effect""Allow",
                            "Action""iam:CreateServiceLinkedRole",
                            "Resource""*",
                            "Condition": {
                                "StringEquals": {
                                    "iam:AWSServiceName": [
                                        "autoscaling.amazonaws.com",
                                        "ec2scheduled.amazonaws.com",
                                        "elasticloadbalancing.amazonaws.com",
                                        "spot.amazonaws.com",
                                        "spotfleet.amazonaws.com",
                                        "transitgateway.amazonaws.com"
                                    ]
                                }
                            }
                        }
                    ]
                },
                "VersionId""v5",
                "IsDefaultVersion"true,
                "CreateDate""2018-11-27 02:16:56+00:00"
            },
            "ResponseMetadata": {
                "RequestId""6d32c946-1273-4bc5-b465-e5549dc4f515",
                "HTTPStatusCode"200,
                "HTTPHeaders": {
                    "x-amzn-requestid""6d32c946-1273-4bc5-b465-e5549dc4f515",
                    "content-type""text/xml",
                    "content-length""2312",
                    "vary""accept-encoding",
                    "date""Thu, 25 Feb 2021 15:32:18 GMT"
                },
                "RetryAttempts"0
            }
        },
        "Policy": {
            "Policy": {
                "PolicyName""AmazonEC2FullAccess",
                "PolicyId""ANPAI3VAJF5ZCRZ7MCQE6",
                "Arn""arn:aws:iam::aws:policy/AmazonEC2FullAccess",
                "Path""/",
                "DefaultVersionId""v5",
                "AttachmentCount"2,
                "PermissionsBoundaryUsageCount"0,
                "IsAttachable"true,
                "Description""Provides full access to Amazon EC2 via the AWS Management Console.",
                "CreateDate""2015-02-06 18:40:15+00:00",
                "UpdateDate""2018-11-27 02:16:56+00:00"
            },
            "ResponseMetadata": {
                "RequestId""a7e9f175-a757-4215-851e-f3d001083631",
                "HTTPStatusCode"200,
                "HTTPHeaders": {
                    "x-amzn-requestid""a7e9f175-a757-4215-851e-f3d001083631",
                    "content-type""text/xml",
                    "content-length""866",
                    "date""Thu, 25 Feb 2021 15:32:18 GMT"
                },
                "RetryAttempts"0
            }
        }
    }
}

I have tried to map this json to Flink data types as follows:
input_type = Types.ROW_NAMED(
        ['timestamp''harvesterID''clientID''deviceID''payload'],
        [
            Types.LONG(),  # timestamp
            Types.STRING(),  # harvesterID
            Types.STRING(),  # clientID
            Types.STRING(),  # deviceID
            Types.ROW_NAMED(  # Payload
                ['Version''Policy'],
                [
                    Types.ROW_NAMED(  # Version
                        ['PolicyVersion''ResponseMetadata'],
                        [
                            Types.ROW_NAMED(  # PolicyVersion
                                ['Document''VersionId''IsDefaultVersion''CreateDate'],
                                [
                                    Types.ROW_NAMED(  # Document
                                        ['Version''Statement'],
                                        [
                                            Types.STRING(),  # Version
                                            Types.LIST(  # Statement
                                                Types.ROW_NAMED(
                                                    ['Action''Effect''Resource''Condition'],
                                                    [
                                                        Types.STRING(),  # Action
                                                        Types.STRING(),  # Effect
                                                        Types.STRING(),  # Resource
                                                        Types.ROW_NAMED(  # Condition
                                                            ['StringEquals'],
                                                            [
                                                                Types.ROW_NAMED(  # StringEquals
                                                                    ['iam:AWSServiceName'],
                                                                    [
                                                                        Types.LIST(Types.STRING())  # iam:AWSServiceName
                                                                    ])
                                                            ])
                                                    ])
                                            )
                                        ]),
                                    Types.STRING(),  # VersionId
                                    Types.BOOLEAN(),  # IsDefaultVersion
                                    Types.STRING()  # CreateDate
                                ]),
                            Types.ROW_NAMED(
                                ['RequestId''HTTPStatusCode''HTTPHeaders''RetryAttempts'],
                                [
                                    Types.STRING(),  # RequestId
                                    Types.INT(),  # HTTPStatusCode
                                    Types.MAP(  # HTTPHeaders
                                        Types.STRING(),
                                        Types.STRING()
                                    ),
                                    Types.INT()  # RetryAttempts
                                ])
                        ]),
                    Types.ROW_NAMED(  # Policy
                        ['Policy''ResponseMetadata'],
                        [
                            Types.ROW_NAMED(  # Policy
                                ['PolicyName''PolicyId''Arn''Path''DefaultVersionId''AttachmentCount',
                                    'PermissionBoundaryUsageCount''IsAttachable''Description''CreateDate''UpdateDate'],
                                [
                                    Types.STRING(),  # PolicyName
                                    Types.STRING(),  # PolicyId
                                    Types.STRING(),  # Arn
                                    Types.STRING(),  # Path
                                    Types.STRING(),  # DefaultVersionId
                                    Types.INT(),  # AttachmentCount
                                    Types.INT(),  # PermissionBoundaryUsageCount
                                    Types.BOOLEAN(),  # IsAttachable
                                    Types.STRING(),  # Description
                                    Types.STRING(),  # CreateDate
                                    Types.STRING()  # UpdateDate
                                ]),
                            Types.ROW_NAMED(  # ResponseMetadata
                                ['RequestId''HTTPStatusCode''HTTPHeaders''RetryAttempts'],
                                [
                                    Types.STRING(),  # RequestId
                                    Types.INT(),  # HTTPStatusCode
                                    Types.MAP(  # HTTPHeaders
                                        Types.STRING(),
                                        Types.STRING()
                                    ),
                                    Types.INT()  # RetryAttempts
                                ])
                        ])
                ])
        ])

But when I try to run submit the application to Flink, I receive the following error, when it tries to read data from the Kafka topic:
java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to org.apache.flink.types.Row
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:102)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)

I think the problem is caused by the Statement list in the event type format (I have been counting the number of RowSerializer calls on the exception stack and it seems that the ListSerializer is called for the Statement field of the Document row). Is there a bug in Flink, or am I not using the LIST type correctly? I really need to have a list in which the element type is composite and not a primitive type. (I have also tried to use the BASIC_ARRAY and PRIMITIVE_ARRAY types, but with those the job fails even before it is submitted, which I expected would happen)

Best regards,
Laszlo