Hello, While trying to use the Pyflink DataStream API in Flink 1.13, I have encountered an error regarding list types. I am trying to read data from a Kafka topic that contains events in a json format. For example: { "timestamp": 1614259940, "harvesterID": "aws-harvester", "clientID": "aws-client-id", "deviceID": "aws-devid", "payload": { "Version": { "PolicyVersion": { "Document": { "Version": "2012-10-17", "Statement": [ { "Action": "ec2:*", "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": "elasticloadbalancing:*", "Resource": "*" }, { "Effect": "Allow", "Action": "cloudwatch:*", "Resource": "*" }, { "Effect": "Allow", "Action": "autoscaling:*", "Resource": "*" }, { "Effect": "Allow", "Action": "iam:CreateServiceLinkedRole", "Resource": "*", "Condition": { "StringEquals": { "iam:AWSServiceName": [ ] } } } ] }, "VersionId": "v5", "IsDefaultVersion": true, "CreateDate": "2018-11-27 02:16:56+00:00" }, "ResponseMetadata": { "RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "6d32c946-1273-4bc5-b465-e5549dc4f515", "content-type": "text/xml", "content-length": "2312", "vary": "accept-encoding", "date": "Thu, 25 Feb 2021 15:32:18 GMT" }, "RetryAttempts": 0 } }, "Policy": { "Policy": { "PolicyName": "AmazonEC2FullAccess", "PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6", "Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess", "Path": "/", "DefaultVersionId": "v5", "AttachmentCount": 2, "PermissionsBoundaryUsageCount": 0, "IsAttachable": true, "Description": "Provides full access to Amazon EC2 via the AWS Management Console.", "CreateDate": "2015-02-06 18:40:15+00:00", "UpdateDate": "2018-11-27 02:16:56+00:00" }, "ResponseMetadata": { "RequestId": "a7e9f175-a757-4215-851e-f3d001083631", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "a7e9f175-a757-4215-851e-f3d001083631", "content-type": "text/xml", "content-length": "866", "date": "Thu, 25 Feb 2021 15:32:18 GMT" }, "RetryAttempts": 0 } } } } input_type = Types.ROW_NAMED( ['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'], [ Types.LONG(), # timestamp Types.STRING(), # harvesterID Types.STRING(), # clientID Types.STRING(), # deviceID Types.ROW_NAMED( # Payload ['Version', 'Policy'], [ Types.ROW_NAMED( # Version ['PolicyVersion', 'ResponseMetadata'], [ Types.ROW_NAMED( # PolicyVersion ['Document', 'VersionId', 'IsDefaultVersion', 'CreateDate'], [ Types.ROW_NAMED( # Document ['Version', 'Statement'], [ Types.STRING(), # Version Types.LIST( # Statement Types.ROW_NAMED( ['Action', 'Effect', 'Resource', 'Condition'], [ Types.STRING(), # Action Types.STRING(), # Effect Types.STRING(), # Resource Types.ROW_NAMED( # Condition ['StringEquals'], [ Types.ROW_NAMED( # StringEquals ['iam:AWSServiceName'], [ Types.LIST(Types.STRING()) # iam:AWSServiceName ]) ]) ]) ) ]), Types.STRING(), # VersionId Types.BOOLEAN(), # IsDefaultVersion Types.STRING() # CreateDate ]), Types.ROW_NAMED( ['RequestId', 'HTTPStatusCode', 'HTTPHeaders', 'RetryAttempts'], [ Types.STRING(), # RequestId Types.INT(), # HTTPStatusCode Types.MAP( # HTTPHeaders Types.STRING(), Types.STRING() ), Types.INT() # RetryAttempts ]) ]), Types.ROW_NAMED( # Policy ['Policy', 'ResponseMetadata'], [ Types.ROW_NAMED( # Policy ['PolicyName', 'PolicyId', 'Arn', 'Path', 'DefaultVersionId', 'AttachmentCount', 'PermissionBoundaryUsageCount', 'IsAttachable', 'Description', 'CreateDate', 'UpdateDate'], [ Types.STRING(), # PolicyName Types.STRING(), # PolicyId Types.STRING(), # Arn Types.STRING(), # Path Types.STRING(), # DefaultVersionId Types.INT(), # AttachmentCount Types.INT(), # PermissionBoundaryUsageCount Types.BOOLEAN(), # IsAttachable Types.STRING(), # Description Types.STRING(), # CreateDate Types.STRING() # UpdateDate ]), Types.ROW_NAMED( # ResponseMetadata ['RequestId', 'HTTPStatusCode', 'HTTPHeaders', 'RetryAttempts'], [ Types.STRING(), # RequestId Types.INT(), # HTTPStatusCode Types.MAP( # HTTPHeaders Types.STRING(), Types.STRING() ), Types.INT() # RetryAttempts ]) ]) ]) ]) But when I try to run submit the application to Flink, I receive the following error, when it tries to read data from the Kafka topic: java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to org.apache.flink.types.Row at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:102) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) Best regards, Laszlo |
Hi Laszlo,
It seems because the json format supports object array type and doesn’t support list type. However, it still hasn’t provided object array type in PyFlink Datastream API [1]. I have created a ticket as a following up. For now, I guess you could implement it yourself and could take a look at the basic array [2] as an example. Regards, Dian
|
Free forum by Nabble | Edit this page |