Avro serialization problem after updating to flink 1.6.0

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Avro serialization problem after updating to flink 1.6.0

mark.harris
Hi, 

I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys successfully as usual, but logs the following exception shortly after starting: 

Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226) 
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383) 
    ... 13 more 
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) 
    at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) 
    at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) 
    at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) 
    ... 23 more 
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.AlertEvent 
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) 
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) 
    ... 27 more 


AlertEvent is a scala case class generated using sbt avrohugger (https://github.com/julianpeeters/sbt-avrohugger) that definitely implements SpecificRecordBase. 

There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to 1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief look at the code in SpecificData.create - it seems like it would still have tried the getDeclaredField("$SCHEMA") check that's throwing. 

Any advice on how to figure out what's causing the problem, or work around it would be gratefully received. 

Best regards, 

Mark Harris

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.
Reply | Threaded
Open this post in threaded view
|

Re: Avro serialization problem after updating to flink 1.6.0

Aljoscha Krettek
Hi,

can you check whether AlertEvent actually has a field called "SCHEMA$"? You can do that via
javap path/to/AlertEvent.class

Best,
Aljoscha

On 27. Sep 2018, at 10:03, Mark Harris <[hidden email]> wrote:

Hi, 

I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys successfully as usual, but logs the following exception shortly after starting: 

Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226) 
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383) 
    ... 13 more 
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) 
    at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) 
    at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) 
    at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) 
    ... 23 more 
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.AlertEvent 
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) 
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) 
    ... 27 more 


AlertEvent is a scala case class generated using sbt avrohugger (https://github.com/julianpeeters/sbt-avrohugger) that definitely implements SpecificRecordBase. 

There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to 1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief look at the code in SpecificData.create - it seems like it would still have tried the getDeclaredField("$SCHEMA") check that's throwing. 

Any advice on how to figure out what's causing the problem, or work around it would be gratefully received. 

Best regards, 

Mark Harris

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

Reply | Threaded
Open this post in threaded view
|

Re: Avro serialization problem after updating to flink 1.6.0

mark.harris
Hi,

It does not. Looking at the generated code, that SCHEMA$ value gets created in the companion object for the case class (which behaves equivalently to a static field in java). 

This gets compiled down to a classfile with a $ suffix- in this case, "AlertEvent.SCHEMA$" doesn't exist, and to get the schema, AlertEvent$.MODULE$.SCHEMA$() would have to be called. Looking at the output for javap, there is a SCHEMA$() method on AlertEvent too.

I wonder if the field itself is a bit of a red herring though? The code we're using for that event hasn't changed between flink 1.3.2 and 1.6.1?

Best regards,

Mark


On Thu, 4 Oct 2018 at 14:03, Aljoscha Krettek <[hidden email]> wrote:
Hi,

can you check whether AlertEvent actually has a field called "SCHEMA$"? You can do that via
javap path/to/AlertEvent.class

Best,
Aljoscha

On 27. Sep 2018, at 10:03, Mark Harris <[hidden email]> wrote:

Hi, 

I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys successfully as usual, but logs the following exception shortly after starting: 

Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226) 
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383) 
    ... 13 more 
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) 
    at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) 
    at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) 
    at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) 
    ... 23 more 
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.AlertEvent 
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) 
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) 
    ... 27 more 


AlertEvent is a scala case class generated using sbt avrohugger (https://github.com/julianpeeters/sbt-avrohugger) that definitely implements SpecificRecordBase. 

There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to 1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief look at the code in SpecificData.create - it seems like it would still have tried the getDeclaredField("$SCHEMA") check that's throwing. 

Any advice on how to figure out what's causing the problem, or work around it would be gratefully received. 

Best regards, 

Mark Harris

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.


hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.
Reply | Threaded
Open this post in threaded view
|

Re: Avro serialization problem after updating to flink 1.6.0

Aljoscha Krettek
Hmm, so if there is this code that wants the $SCHEMA field, I wonder how it ever worked. Did you try the newly rebuilt avro model against the older Flink version (1.3.2)?

If possible, you could send the code and we could look into it? OR, could you try running it in a debugger and set a breakpoint on the getDeclaredField("$SCHEMA") call and see how this works when running with Flink 1.3.2?

Best,
Aljoscha

On 23. Oct 2018, at 12:53, Mark Harris <[hidden email]> wrote:

Hi,

It does not. Looking at the generated code, that SCHEMA$ value gets created in the companion object for the case class (which behaves equivalently to a static field in java). 

This gets compiled down to a classfile with a $ suffix- in this case, "AlertEvent.SCHEMA$" doesn't exist, and to get the schema, AlertEvent$.MODULE$.SCHEMA$() would have to be called. Looking at the output for javap, there is a SCHEMA$() method on AlertEvent too.

I wonder if the field itself is a bit of a red herring though? The code we're using for that event hasn't changed between flink 1.3.2 and 1.6.1?

Best regards,

Mark


On Thu, 4 Oct 2018 at 14:03, Aljoscha Krettek <[hidden email]> wrote:
Hi,

can you check whether AlertEvent actually has a field called "SCHEMA$"? You can do that via
javap path/to/AlertEvent.class

Best,
Aljoscha

On 27. Sep 2018, at 10:03, Mark Harris <[hidden email]> wrote:

Hi, 

I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys successfully as usual, but logs the following exception shortly after starting: 

Caused by: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357) 
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241) 
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226) 
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719) 
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383) 
    ... 13 more 
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.test.serde.AlertEvent 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) 
    at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) 
    at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) 
    at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) 
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) 
    ... 23 more 
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class uk.co.AlertEvent 
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) 
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) 
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) 
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) 
    ... 27 more 


AlertEvent is a scala case class generated using sbt avrohugger (https://github.com/julianpeeters/sbt-avrohugger) that definitely implements SpecificRecordBase. 

There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to 1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief look at the code in SpecificData.create - it seems like it would still have tried the getDeclaredField("$SCHEMA") check that's throwing. 

Any advice on how to figure out what's causing the problem, or work around it would be gratefully received. 

Best regards, 

Mark Harris

hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.


hivehome.com



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. 
Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.