Error While Initializing S3A FileSystem

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Error While Initializing S3A FileSystem

Manish Bellani

hey Friends,

Thanks for all the work you have been doing on flink, I have been trying to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm running into some issues (which I suspect could be dependency/packaging related) that'd try to describe here.

The data pipeline is quite simple:

Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3

Environment:

Kubernetes
Debian
DockerImage: flink:1.7.2-hadoop28-scala_2.11
Java 1.8
Hadoop Version: 2.8.5

I followed this dependency section: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27 to place the dependencies under /opt/flink/lib (with an exception that my Hadoop version and it's dependencies that I pull in are different).

Here are the dependencies I'm pulling in (excerpt from my Dockerfile)

RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar <a href="http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive">http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar

 

But when I submit the job, it throws this error during initialization of BucketingSink/S3AFileSystem:

java.beans.IntrospectionException: bad write method arg count: public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
    at java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
    at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
    at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
 


 

Some googling about "bad write method arg count" reveals that it could potentially be related to a beanutils issue, but I'm not entirely sure. I've hunted through all the jars that are on the classpath:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf

 

and i see that `FluentPropertyBeanIntrospector` is contained within the following two jars:

flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
flink-shaded-hadoop2-uber-1.7.2.jar:2019 org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class

 both of those jars are packaged as part of the flink distribution I'm using. I can't think of any other options atm other than thinking that this could potentially be some incompatible transitive dependency issue. I would love to get some advice from y'all to see if this is a packaging bug or something else on my side.


Thanks

Manish

Reply | Threaded
Open this post in threaded view
|

Re: Error While Initializing S3A FileSystem

Ken Krugler
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks about Commons Beanutils logging this exception, but it’s a warning vs. something being thrown up the stack.

— Ken

On May 15, 2019, at 3:50 PM, Manish Bellani <[hidden email]> wrote:

hey Friends,

Thanks for all the work you have been doing on flink, I have been trying to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm running into some issues (which I suspect could be dependency/packaging related) that'd try to describe here.

The data pipeline is quite simple:

Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3

Environment:

Kubernetes
Debian
DockerImage: flink:1.7.2-hadoop28-scala_2.11
Java 1.8
Hadoop Version: 2.8.5

I followed this dependency section: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27 to place the dependencies under /opt/flink/lib (with an exception that my Hadoop version and it's dependencies that I pull in are different).

Here are the dependencies I'm pulling in (excerpt from my Dockerfile)

RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
 

But when I submit the job, it throws this error during initialization of BucketingSink/S3AFileSystem:

java.beans.IntrospectionException: bad write method arg count: public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
    at java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
    at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
    at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
 


 

Some googling about "bad write method arg count" reveals that it could potentially be related to a beanutils issue, but I'm not entirely sure. I've hunted through all the jars that are on the classpath:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
 

and i see that `FluentPropertyBeanIntrospector` is contained within the following two jars:

flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
flink-shaded-hadoop2-uber-1.7.2.jar:2019 org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class

 both of those jars are packaged as part of the flink distribution I'm using. I can't think of any other options atm other than thinking that this could potentially be some incompatible transitive dependency issue. I would love to get some advice from y'all to see if this is a packaging bug or something else on my side.


Thanks

Manish


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Error While Initializing S3A FileSystem

Manish Bellani
Hi Ken,

Thanks for the quick response, you are actually right, the job seems to be running even after that error appears. It was crashing earlier (due to fs.s3a.multipart.size being too high) and I confused it with this error since that was the first one popping out and OOM wasn't apparent immediately.

I do have a subsequent question though if you don't mind me asking this question in the same thread. So... if I'm reading the  BucketingSink code correctly then if I supply the core-site.xml with following contents, would it not pick the S3RecoverableWriter code path?:

<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <property>
        <name>fs.s3a.fast.upload</name>
        <value>true</value>
        <description>
            Use the incremental block upload mechanism with
            the buffering mechanism set in fs.s3a.fast.upload.buffer.
            The number of threads performing uploads in the filesystem is defined
            by fs.s3a.threads.max; the queue of waiting uploads limited by
            fs.s3a.max.total.tasks.
            The size of each buffer is set by fs.s3a.multipart.size.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.buffer</name>
        <value>array</value>
        <description>
            The buffering mechanism to use when using S3A fast upload
            (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
            This configuration option has no effect if fs.s3a.fast.upload is false.

            "disk" will use the directories listed in fs.s3a.buffer.dir as
            the location(s) to save data prior to being uploaded.

            "array" uses arrays in the JVM heap

            "bytebuffer" uses off-heap memory within the JVM.

            Both "array" and "bytebuffer" will consume memory in a single stream up to the number
            of blocks set by:

            fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

            If using either of these mechanisms, keep this value low

            The total number of threads performing work across all threads is set by
            fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
            work items.
        </description>
    </property>

    <property>
        <name>fs.s3a.multipart.size</name>
        <value>10M</value>
        <description>How big (in bytes) to split upload or copy operations up into.
            A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.active.blocks</name>
        <value>8</value>
        <description>
            Maximum Number of blocks a single output stream can have
            active (uploading, or queued to the central FileSystem
            instance's pool of queued operations.

            This stops a single stream overloading the shared thread pool.
        </description>
    </property>

    <property>
      <name>fs.s3a.aws.credentials.provider</name>
      <value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
    </property>

</configuration>

I say that because I don't see any files being written under `/tmp` directory with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is supposed to create for staging writes to s3 (which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):

    public RefCountedFile apply(File file) throws IOException {
       File directory = this.tempDirectories[this.nextIndex()];

        while(true) {
            try {
                if (file == null) {
                    File newFile = new File(directory, ".tmp_" + UUID.randomUUID());
                    OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
                    return RefCountedFile.newFile(newFile, out);
                }

                OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
                return RefCountedFile.restoredFile(file, out, file.length());
            } catch (FileAlreadyExistsException var5) {
            }
        }
    }


Is S3RecoverableWriter path even supported for BucketingSink?

Manish


On Wed, May 15, 2019 at 6:05 PM Ken Krugler <[hidden email]> wrote:
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks about Commons Beanutils logging this exception, but it’s a warning vs. something being thrown up the stack.

— Ken

On May 15, 2019, at 3:50 PM, Manish Bellani <[hidden email]> wrote:

hey Friends,

Thanks for all the work you have been doing on flink, I have been trying to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm running into some issues (which I suspect could be dependency/packaging related) that'd try to describe here.

The data pipeline is quite simple:

Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3

Environment:

Kubernetes
Debian
DockerImage: flink:1.7.2-hadoop28-scala_2.11
Java 1.8
Hadoop Version: 2.8.5

I followed this dependency section: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27 to place the dependencies under /opt/flink/lib (with an exception that my Hadoop version and it's dependencies that I pull in are different).

Here are the dependencies I'm pulling in (excerpt from my Dockerfile)

RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
 

But when I submit the job, it throws this error during initialization of BucketingSink/S3AFileSystem:

java.beans.IntrospectionException: bad write method arg count: public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
    at java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
    at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
    at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
 


 

Some googling about "bad write method arg count" reveals that it could potentially be related to a beanutils issue, but I'm not entirely sure. I've hunted through all the jars that are on the classpath:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
 

and i see that `FluentPropertyBeanIntrospector` is contained within the following two jars:

flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
flink-shaded-hadoop2-uber-1.7.2.jar:2019 org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class

 both of those jars are packaged as part of the flink distribution I'm using. I can't think of any other options atm other than thinking that this could potentially be some incompatible transitive dependency issue. I would love to get some advice from y'all to see if this is a packaging bug or something else on my side.


Thanks

Manish


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Error While Initializing S3A FileSystem

Ken Krugler
Hi Manish,

It’s best to start a new thread if you have a new question - see https://home.apache.org/~hossman/#threadhijack for reasons why…

Regards,

— Ken


On May 15, 2019, at 4:46 PM, Manish Bellani <[hidden email]> wrote:

Hi Ken,

Thanks for the quick response, you are actually right, the job seems to be running even after that error appears. It was crashing earlier (due to fs.s3a.multipart.size being too high) and I confused it with this error since that was the first one popping out and OOM wasn't apparent immediately.

I do have a subsequent question though if you don't mind me asking this question in the same thread. So... if I'm reading the  BucketingSink code correctly then if I supply the core-site.xml with following contents, would it not pick the S3RecoverableWriter code path?:

<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <property>
        <name>fs.s3a.fast.upload</name>
        <value>true</value>
        <description>
            Use the incremental block upload mechanism with
            the buffering mechanism set in fs.s3a.fast.upload.buffer.
            The number of threads performing uploads in the filesystem is defined
            by fs.s3a.threads.max; the queue of waiting uploads limited by
            fs.s3a.max.total.tasks.
            The size of each buffer is set by fs.s3a.multipart.size.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.buffer</name>
        <value>array</value>
        <description>
            The buffering mechanism to use when using S3A fast upload
            (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
            This configuration option has no effect if fs.s3a.fast.upload is false.

            "disk" will use the directories listed in fs.s3a.buffer.dir as
            the location(s) to save data prior to being uploaded.

            "array" uses arrays in the JVM heap

            "bytebuffer" uses off-heap memory within the JVM.

            Both "array" and "bytebuffer" will consume memory in a single stream up to the number
            of blocks set by:

            fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

            If using either of these mechanisms, keep this value low

            The total number of threads performing work across all threads is set by
            fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
            work items.
        </description>
    </property>

    <property>
        <name>fs.s3a.multipart.size</name>
        <value>10M</value>
        <description>How big (in bytes) to split upload or copy operations up into.
            A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.active.blocks</name>
        <value>8</value>
        <description>
            Maximum Number of blocks a single output stream can have
            active (uploading, or queued to the central FileSystem
            instance's pool of queued operations.

            This stops a single stream overloading the shared thread pool.
        </description>
    </property>

    <property>
      <name>fs.s3a.aws.credentials.provider</name>
      <value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
    </property>

</configuration>

I say that because I don't see any files being written under `/tmp` directory with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is supposed to create for staging writes to s3 (which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):

    public RefCountedFile apply(File file) throws IOException {
       File directory = this.tempDirectories[this.nextIndex()];

        while(true) {
            try {
                if (file == null) {
                    File newFile = new File(directory, ".tmp_" + UUID.randomUUID());
                    OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
                    return RefCountedFile.newFile(newFile, out);
                }

                OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
                return RefCountedFile.restoredFile(file, out, file.length());
            } catch (FileAlreadyExistsException var5) {
            }
        }
    }


Is S3RecoverableWriter path even supported for BucketingSink?

Manish


On Wed, May 15, 2019 at 6:05 PM Ken Krugler <[hidden email]> wrote:
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks about Commons Beanutils logging this exception, but it’s a warning vs. something being thrown up the stack.

— Ken

On May 15, 2019, at 3:50 PM, Manish Bellani <[hidden email]> wrote:

hey Friends,

Thanks for all the work you have been doing on flink, I have been trying to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm running into some issues (which I suspect could be dependency/packaging related) that'd try to describe here.

The data pipeline is quite simple:

Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3

Environment:

Kubernetes
Debian
DockerImage: flink:1.7.2-hadoop28-scala_2.11
Java 1.8
Hadoop Version: 2.8.5

I followed this dependency section: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27 to place the dependencies under /opt/flink/lib (with an exception that my Hadoop version and it's dependencies that I pull in are different).

Here are the dependencies I'm pulling in (excerpt from my Dockerfile)

RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
 

But when I submit the job, it throws this error during initialization of BucketingSink/S3AFileSystem:

java.beans.IntrospectionException: bad write method arg count: public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
    at java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
    at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
    at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
 


 

Some googling about "bad write method arg count" reveals that it could potentially be related to a beanutils issue, but I'm not entirely sure. I've hunted through all the jars that are on the classpath:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
 

and i see that `FluentPropertyBeanIntrospector` is contained within the following two jars:

flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
flink-shaded-hadoop2-uber-1.7.2.jar:2019 org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class

 both of those jars are packaged as part of the flink distribution I'm using. I can't think of any other options atm other than thinking that this could potentially be some incompatible transitive dependency issue. I would love to get some advice from y'all to see if this is a packaging bug or something else on my side.


Thanks

Manish


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Error While Initializing S3A FileSystem

Manish Bellani
Thanks, Ken. That makes sense! I'll start a new thread.

On Wed, May 15, 2019 at 7:12 PM Ken Krugler <[hidden email]> wrote:
Hi Manish,

It’s best to start a new thread if you have a new question - see https://home.apache.org/~hossman/#threadhijack for reasons why…

Regards,

— Ken


On May 15, 2019, at 4:46 PM, Manish Bellani <[hidden email]> wrote:

Hi Ken,

Thanks for the quick response, you are actually right, the job seems to be running even after that error appears. It was crashing earlier (due to fs.s3a.multipart.size being too high) and I confused it with this error since that was the first one popping out and OOM wasn't apparent immediately.

I do have a subsequent question though if you don't mind me asking this question in the same thread. So... if I'm reading the  BucketingSink code correctly then if I supply the core-site.xml with following contents, would it not pick the S3RecoverableWriter code path?:

<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <property>
        <name>fs.s3a.fast.upload</name>
        <value>true</value>
        <description>
            Use the incremental block upload mechanism with
            the buffering mechanism set in fs.s3a.fast.upload.buffer.
            The number of threads performing uploads in the filesystem is defined
            by fs.s3a.threads.max; the queue of waiting uploads limited by
            fs.s3a.max.total.tasks.
            The size of each buffer is set by fs.s3a.multipart.size.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.buffer</name>
        <value>array</value>
        <description>
            The buffering mechanism to use when using S3A fast upload
            (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
            This configuration option has no effect if fs.s3a.fast.upload is false.

            "disk" will use the directories listed in fs.s3a.buffer.dir as
            the location(s) to save data prior to being uploaded.

            "array" uses arrays in the JVM heap

            "bytebuffer" uses off-heap memory within the JVM.

            Both "array" and "bytebuffer" will consume memory in a single stream up to the number
            of blocks set by:

            fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

            If using either of these mechanisms, keep this value low

            The total number of threads performing work across all threads is set by
            fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
            work items.
        </description>
    </property>

    <property>
        <name>fs.s3a.multipart.size</name>
        <value>10M</value>
        <description>How big (in bytes) to split upload or copy operations up into.
            A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
        </description>
    </property>

    <property>
        <name>fs.s3a.fast.upload.active.blocks</name>
        <value>8</value>
        <description>
            Maximum Number of blocks a single output stream can have
            active (uploading, or queued to the central FileSystem
            instance's pool of queued operations.

            This stops a single stream overloading the shared thread pool.
        </description>
    </property>

    <property>
      <name>fs.s3a.aws.credentials.provider</name>
      <value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
    </property>

</configuration>

I say that because I don't see any files being written under `/tmp` directory with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is supposed to create for staging writes to s3 (which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):

    public RefCountedFile apply(File file) throws IOException {
       File directory = this.tempDirectories[this.nextIndex()];

        while(true) {
            try {
                if (file == null) {
                    File newFile = new File(directory, ".tmp_" + UUID.randomUUID());
                    OutputStream out = Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
                    return RefCountedFile.newFile(newFile, out);
                }

                OutputStream out = Files.newOutputStream(file.toPath(), StandardOpenOption.APPEND);
                return RefCountedFile.restoredFile(file, out, file.length());
            } catch (FileAlreadyExistsException var5) {
            }
        }
    }


Is S3RecoverableWriter path even supported for BucketingSink?

Manish


On Wed, May 15, 2019 at 6:05 PM Ken Krugler <[hidden email]> wrote:
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks about Commons Beanutils logging this exception, but it’s a warning vs. something being thrown up the stack.

— Ken

On May 15, 2019, at 3:50 PM, Manish Bellani <[hidden email]> wrote:

hey Friends,

Thanks for all the work you have been doing on flink, I have been trying to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm running into some issues (which I suspect could be dependency/packaging related) that'd try to describe here.

The data pipeline is quite simple:

Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3

Environment:

Kubernetes
Debian
DockerImage: flink:1.7.2-hadoop28-scala_2.11
Java 1.8
Hadoop Version: 2.8.5

I followed this dependency section: https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27 to place the dependencies under /opt/flink/lib (with an exception that my Hadoop version and it's dependencies that I pull in are different).

Here are the dependencies I'm pulling in (excerpt from my Dockerfile)

RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
 

But when I submit the job, it throws this error during initialization of BucketingSink/S3AFileSystem:

java.beans.IntrospectionException: bad write method arg count: public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
    at java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
    at java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
    at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
 


 

Some googling about "bad write method arg count" reveals that it could potentially be related to a beanutils issue, but I'm not entirely sure. I've hunted through all the jars that are on the classpath:

/usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink/conf
 

and i see that `FluentPropertyBeanIntrospector` is contained within the following two jars:

flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
flink-shaded-hadoop2-uber-1.7.2.jar:2019 org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class

 both of those jars are packaged as part of the flink distribution I'm using. I can't think of any other options atm other than thinking that this could potentially be some incompatible transitive dependency issue. I would love to get some advice from y'all to see if this is a packaging bug or something else on my side.


Thanks

Manish


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra