Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

luisfaamaral
Hello,

I'm looking for a way to modify state inside an operator in Flink. I’m following State Processor API guide - https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints
However when I execute locally using macOS it throws me an error related to assertion failure - apparently when writing the Savepoint.

Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238.

Here are the project dependencies
<dependencies>
    <!--        Lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>
    <!--        Avro-->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>${avro.version}</version>
    </dependency>
    <!--        Google Cloud-->
    <dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-2.0.0</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-nio</artifactId>
        <version>0.121.0</version>
    </dependency>
    <!--        Flink-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-state-processor-api_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-shaded-hadoop2</artifactId>
        <version>2.8.3-1.8.3</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>

And here is the code we are using to just update the field setBookingPeriod to "2020/01” in all states.
public class Migrator implements Serializable {

    private static final String SAVEPOINT = "<a href="gs://XXX-XXX-flink-state/savepoints/savepoint-292a66-d3fe1a6595c3" class="">gs://XXX-XXX-flink-state/savepoints/savepoint-292a66-d3fe1a6595c3";

    public static void main(String[] args) throws Exception {
        new Migrator().execute();
    }

    public void execute() throws Exception {
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(environment, SAVEPOINT, new RocksDBStateBackend(SAVEPOINT));
        BookingKeyStateMigrator.withOperationId("aggregateBookingLine")
                               .migrate(savepoint)
                               .write(SAVEPOINT.concat("-migrated"));

        environment.execute("State Migrate Job");
    }

}

public class BookingKeyStateMigrator implements Serializable {

    private final String operatorUid;
    private final ValueStateDescriptor<Aggregation> stateDescriptor;

    public static BookingKeyStateMigrator withOperationId(String operatorUid) {
        return new BookingKeyStateMigrator(operatorUid);
    }

    private BookingKeyStateMigrator(String operatorUid) {
        this.operatorUid = operatorUid;
        this.stateDescriptor = new ValueStateDescriptor<>("currentAggregation", Aggregation.class);
    }


    public ExistingSavepoint migrate(ExistingSavepoint savepoint) throws IOException {
        DataSet<KeyedState<BookingKey, Aggregation>> keyedStateDataSet = savepoint.readKeyedState(operatorUid, new BookingKeyStateReader(stateDescriptor));

        return savepoint.removeOperator(operatorUid)
                        .withOperator(operatorUid,
                                      OperatorTransformation.bootstrapWith(keyedStateDataSet)
                                                            .keyBy(keyedState -> BookingKeyTransformation.toBookingKeyWithNewBookingPeriod(keyedState.getKey()))
                                                            .transform(new BookingKeyStateBootstrap(stateDescriptor)));
    }

}

public class BookingKeyStateReader extends KeyedStateReaderFunction<BookingKey, KeyedState<BookingKey, Aggregation>> {

    private final ValueStateDescriptor<Aggregation> stateDescriptor;
    private transient ValueState<Aggregation> state;

    public BookingKeyStateReader(ValueStateDescriptor<Aggregation> stateDescriptor) {
        this.stateDescriptor = stateDescriptor;
    }

    @Override
    public void open(Configuration configuration) {
        state = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void readKey(BookingKey key, Context context, Collector<KeyedState<BookingKey, Aggregation>> out) throws Exception {
        KeyedState<BookingKey, Aggregation> data = new KeyedState<>(key, state.value());
        out.collect(data);
    }

}

public class BookingKeyStateBootstrap extends KeyedStateBootstrapFunction<BookingKey, KeyedState<BookingKey, Aggregation>> {

    private final ValueStateDescriptor<Aggregation> stateDescriptor;
    private transient ValueState<Aggregation> state;

    public BookingKeyStateBootstrap(ValueStateDescriptor<Aggregation> stateDescriptor) {
        this.stateDescriptor = stateDescriptor;
    }

    @Override
    public void open(Configuration configuration) {
        state = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(KeyedState<BookingKey, Aggregation> keyedState, Context context) throws Exception {
        state.update(keyedState.getValue());
    }

}

public class BookingKeyTransformation {

    private BookingKeyTransformation() {
    }

    static BookingKey toBookingKeyWithNewBookingPeriod(BookingKey bookingKey) {
        return BookingKey.newBuilder(bookingKey)
                         .setBookingPeriod("2020/01")
                         .build();
    }

}

Here is the debug messages that we have when running the code
2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful kerberos logins and latency (milliseconds)], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of failed kerberos logins and latency (milliseconds)], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field private org.apache.hadoop.metrics2.lib.MutableGaugeLong org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailuresTotal with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Renewal failures since startup], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field private org.apache.hadoop.metrics2.lib.MutableGaugeInt org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailures with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Renewal failures since last successful login], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register:231 - UgiMetrics, User and group related metrics
2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.Groups.getUserToGroupsMappingService:448 -  Creating new Groups object
2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.util.NativeCodeLoader.<clinit>:46 - Trying to load the custom-built native-hadoop library...
2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.util.NativeCodeLoader.<clinit>:55 - Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
2020/05/06 19:02 46 [ WARN] [main] org.apache.hadoop.util.NativeCodeLoader.<clinit>:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>:45 - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.Groups.<init>:152 - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000; warningDeltaMs=5000
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory.newDefaultFactory:47 - Using Log4J as the default logging framework
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap.<clinit>:54 - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap.<clinit>:57 - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.isOsx0:966 - Platform: MacOS
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.explicitNoUnsafeCause0:395 - -Dio.netty.noUnsafe: false
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.javaVersion0:871 - Java version: 8
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:120 - sun.misc.Unsafe.theUnsafe: available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:144 - sun.misc.Unsafe.copyMemory: available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:182 - java.nio.Buffer.address: available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:243 - direct buffer constructor: available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:313 - java.nio.Bits.unaligned: available, true
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:378 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:385 - java.nio.DirectByteBuffer.<init>(long, int): available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.unsafeUnavailabilityCause0:992 - sun.misc.Unsafe: available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.tmpdir0:1086 - -Dio.netty.tmpdir: /var/folders/xc/9tp7jyt532371thglfsyh3x1nj7_ry/T (java.io.tmpdir)
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.bitMode0:1165 - -Dio.netty.bitMode: 64 (sun.arch.data.model)
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:157 - -Dio.netty.maxDirectMemory: 3817865216 bytes
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:164 - -Dio.netty.uninitializedArrayAllocationThreshold: -1
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.CleanerJava6.<clinit>:92 - java.nio.ByteBuffer.cleaner(): available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:184 - -Dio.netty.noPreferDirect: false
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.<clinit>:44 - -Dio.netty.eventLoopThreads: 24
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.<clinit>:104 - -Dio.netty.noKeySetOptimization: false
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.<clinit>:105 - -Dio.netty.selectorAutoRebuildThreshold: 512
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent$Mpsc.<clinit>:860 - org.jctools-core.MpscChunkedArrayQueue: available
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId.<clinit>:79 - -Dio.netty.processId: 76289 (auto-detected)
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:139 - -Djava.net.preferIPv4Stack: false
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:140 - -Djava.net.preferIPv6Addresses: false
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:224 - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil$1.run:289 - Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId.<clinit>:101 - -Dio.netty.machineId: 3c:22:fb:ff:fe:18:a2:68 (auto-detected)
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.<clinit>:129 - -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.level: simple
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.<clinit>:130 - -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.targetRecords: 4
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:120 - -Dio.netty.allocator.numHeapArenas: 24
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:121 - -Dio.netty.allocator.numDirectArenas: 24
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:123 - -Dio.netty.allocator.pageSize: 8192
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:128 - -Dio.netty.allocator.maxOrder: 11
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:132 - -Dio.netty.allocator.chunkSize: 16777216
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:133 - -Dio.netty.allocator.tinyCacheSize: 512
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:134 - -Dio.netty.allocator.smallCacheSize: 256
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:135 - -Dio.netty.allocator.normalCacheSize: 64
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:136 - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:137 - -Dio.netty.allocator.cacheTrimInterval: 8192
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:138 - -Dio.netty.allocator.useCacheForAllThreads: true
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:83 - -Dio.netty.allocator.type: pooled
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:92 - -Dio.netty.threadLocalDirectBufferSize: 0
2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:95 - -Dio.netty.maxThreadLocalCharBufferSize: 16384
2020/05/06 19:02 52 [DEBUG] [DataSource (at unionOperatorStates(WritableSavepoint.java:109) (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.<clinit>:61 - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true
2020/05/06 19:02 52 [DEBUG] [DataSource (at unionOperatorStates(WritableSavepoint.java:109) (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.<clinit>:62 - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true
2020/05/06 19:02 52 [DEBUG] [DataSource (at unionOperatorStates(WritableSavepoint.java:109) (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory.newResourceLeakDetector:202 - Loaded default ResourceLeakDetector: org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@3ac57bf3
Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238.


Could you please help me out to understand why this assertion failure is happening?

Thanks,
Luis Amaral

Reply | Threaded
Open this post in threaded view
|

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

luisfaamaral
Reply | Threaded
Open this post in threaded view
|

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

Tzu-Li (Gordon) Tai
Hi,

The last time I saw this error, was that there was a mismatch in the used flink-state-processor-api version and other core Flink dependencies.
Could you confirm that?

Also, are you seeing this assertion error consistently, or only occasionally?
cc'ing Seth, maybe he has other clues on the cause.

Cheers,
Gordon

On Fri, May 8, 2020 at 3:06 PM luisfaamaral <[hidden email]> wrote:
No one? :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

Seth Wiesman
Gordon is correct. Additionally, if you are using flink 1.10 you may be running into a known bug that has been resolved in 1.10.1 which will be released soon.

Seth


On Fri, May 8, 2020 at 5:19 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

The last time I saw this error, was that there was a mismatch in the used flink-state-processor-api version and other core Flink dependencies.
Could you confirm that?

Also, are you seeing this assertion error consistently, or only occasionally?
cc'ing Seth, maybe he has other clues on the cause.

Cheers,
Gordon

On Fri, May 8, 2020 at 3:06 PM luisfaamaral <[hidden email]> wrote:
No one? :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

luisfaamaral
Thanks Gordon and Seth for the reply.

So.. the main project contains the below flink dependencies...



And the state processor project contains the following:
<flink.version>1.9.0</flink.version>



At the first sight I may say all the libraries match to 1.9.0 flink
libraries within both projects.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

Tzu-Li (Gordon) Tai
In that case, the most possible cause would be https://issues.apache.org/jira/browse/FLINK-16313, which is included in Flink 1.10.1 (to be released)

The release candidates for Flink 1.10.1 is currently ongoing, would it be possible for you to try that out and see if the error still occurs?

On Mon, May 11, 2020 at 4:11 PM luisfaamaral <[hidden email]> wrote:
Thanks Gordon and Seth for the reply.

So.. the main project contains the below flink dependencies...



And the state processor project contains the following:
<flink.version>1.9.0</flink.version>



At the first sight I may say all the libraries match to 1.9.0 flink
libraries within both projects.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/