Hi, I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution – not even created. This is my main function: 1 public static void main(String[] args) throws Exception { 2 3 final var env = StreamExecutionEnvironment.getExecutionEnvironment(); 4 5 env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class); 6 7 env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); 8 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 9 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 10 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 11 12 final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env); 13 statefunConfig.setFlinkJobName("test"); 14 statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS); 15 16 final var connectionConfig = new RMQConnectionConfig.Builder() 17 .setHost("localhost") 18 .setUserName("guest") 19 .setPassword("guest") 20 .setPort(5672) 21 .setVirtualHost("test") 22 .setPrefetchCount(5000) 23 .build(); 24 25 final var deserializationSchema = new TypeInformationSerializationSchema<>( 26 new ProtobufTypeInformation<>(Any.class), env.getConfig()); 27 final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, true, deserializationSchema); 28 29 final var source = env 30 .addSource(rmqSource, TEST_INGRESS) 31 .setParallelism(1) 32 .map(msg -> { 33 return RoutableMessageBuilder 34 .builder() 35 .withTargetAddress(MyStatefun.TYPE, Utils.getUUID()) 36 .withMessageBody(msg) 37 .build(); 38 }); 39 40 StatefulFunctionDataStreamBuilder 41 .builder("test") 42 .withDataStreamAsIngress(source) 43 .withFunctionProvider(MyStatefun.TYPE, unused -> { 44 return new MyStatefun(); 45 }) 46 .withEgressId(MyStatefun.EGRESS) 47 .withConfiguration(statefunConfig) 48 .build(env) 49 .getDataStreamForEgressId(MyStatefun.EGRESS) 50 .addSink(new PrintSinkFunction<>(true)); 51 52 env.execute(); 53 54 } A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 is never called. The message is reportingly consumed but never acknowledged or processed. Before using RabbitMQ I used a custom SourceFunction
to fake input data and it worked well. To setup things I use a local environment but logging does not show up any errors. Before my current problem I had another error during message deserialization and it wasn’t reported either. Unfortunately I didn’t manage
to get the exception in the log/stdout. I had to use the debugger to find the reason of the former problem. In this situation now the debugger shows no thrown or caught exceptions. That’s way I stuck. Of course I would like to know what’s the problem with my code. But I guess it is not obviously. Maybe some can give me a hint how to turn on exception logging which might help to get closer to the origin of the phenomenon. Thanks in advance, Stephan |
I found the reason: There is a class incompatibility because I changed from Statefun 2.2.1 + Flink 1.11.1 to Statefun 2.2.1 + Flink 1.12.0 But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3. Is there a possibility to use the newest version of Flink in combination with the newest version of Statefun? I’m wondering why there is no Statefun version matching the current stable version of Flink? Stephan Von: Stephan Pelikan <[hidden email]>
Hi, I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution – not even created. This is my main function: 1 public static void main(String[] args) throws Exception { 2 3 final var env = StreamExecutionEnvironment.getExecutionEnvironment(); 4 5 env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class); 6 7 env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); 8 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 9 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 10 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 11 12 final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env); 13 statefunConfig.setFlinkJobName("test"); 14 statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS); 15 16 final var connectionConfig = new RMQConnectionConfig.Builder() 17 .setHost("localhost") 18 .setUserName("guest") 19 .setPassword("guest") 20 .setPort(5672) 21 .setVirtualHost("test") 22 .setPrefetchCount(5000) 23 .build(); 24 25 final var deserializationSchema = new TypeInformationSerializationSchema<>( 26 new ProtobufTypeInformation<>(Any.class), env.getConfig()); 27 final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, true, deserializationSchema); 28 29 final var source = env 30 .addSource(rmqSource, TEST_INGRESS) 31 .setParallelism(1) 32 .map(msg -> { 33 return RoutableMessageBuilder 34 .builder() 35 .withTargetAddress(MyStatefun.TYPE, Utils.getUUID()) 36 .withMessageBody(msg) 37 .build(); 38 }); 39 40 StatefulFunctionDataStreamBuilder 41 .builder("test") 42 .withDataStreamAsIngress(source) 43 .withFunctionProvider(MyStatefun.TYPE, unused -> { 44 return new MyStatefun(); 45 }) 46 .withEgressId(MyStatefun.EGRESS) 47 .withConfiguration(statefunConfig) 48 .build(env) 49 .getDataStreamForEgressId(MyStatefun.EGRESS) 50 .addSink(new PrintSinkFunction<>(true)); 51 52 env.execute(); 53 54 } A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 is never called. The message is reportingly consumed but never acknowledged or processed. Before using RabbitMQ I used a custom SourceFunction
to fake input data and it worked well. To setup things I use a local environment but logging does not show up any errors. Before my current problem I had another error during message deserialization and it wasn’t reported either. Unfortunately I didn’t manage
to get the exception in the log/stdout. I had to use the debugger to find the reason of the former problem. In this situation now the debugger shows no thrown or caught exceptions. That’s way I stuck. Of course I would like to know what’s the problem with my code. But I guess it is not obviously. Maybe some can give me a hint how to turn on exception logging which might help to get closer to the origin of the phenomenon. Thanks in advance, Stephan |
Hi, There is no lock-step of releasing a new StateFun release when a new Flink release goes out. StateFun and Flink have individual releasing schemes and schedules. Usually, for new major StateFun version releases, we will upgrade its Flink dependency to the latest available version. We are currently targeting mid February for the next major StateFun release, which by then the Flink dependency will be upgraded to 1.12.x. In the meantime, if you'd like to work against Flink 1.12.x with StateFun, you might have to resort to building the artifacts yourself. Cheers, Gordon On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan <[hidden email]> wrote:
|
> currently targeting mid February for the next major StateFun release Thank you Gordon for sharing this information. As I could now see in the Flink Blog you bring out new versions frequently. I’m a newbie to Flink and I supposed a match of the docs of Statefun and Flink. But of course
it is not a problem, I was just astonished. Thanks, Stephan Von: Tzu-Li (Gordon) Tai <[hidden email]>
Hi,
On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |