How to check checkpointing mode

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to check checkpointing mode

Alexey Trenikhun
Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: How to check checkpointing mode

Yun Gao
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun

------------------------------------------------------------------
From:Alexey Trenikhun <[hidden email]>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <[hidden email]>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: How to check checkpointing mode

Alexey Trenikhun
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:

@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
    // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
    // -- after jobs argument, this unnecessary for us arguments will be treated as positional
    // parameters, which we ignore but log warning
    LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
    final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
    see.execute(name);
    return null;
  } catch (InterruptedException e) {
    LOGGER.error("Stream Processor was interrupted", e);
    Thread.currentThread().interrupt();
    throw e;
  } catch (Exception e) {
    LOGGER.error("Stream Processor is terminated due to exception", e);
    throw e;
  }
}

private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
      .getExecutionEnvironment()
      .enableCheckpointing(checkpointInterval.toMillis(),
          CheckpointingMode.AT_LEAST_ONCE)
      .setMaxParallelism(1024)
      .setParallelism(parallelism);
  if (externalizedCheckpoints) {
    see.getCheckpointConfig()
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
      .putAll(kafkaCommonOptions)
      .putAll(kafkaProducerOptions)
      .varFiles(valueFiles)
      .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
      .semantic(Semantic.AT_LEAST_ONCE)
      .config(producerProperties)
      .build();

  final AutoTopic autoTopic = AutoTopic.builder()
      .config(producerProperties)
      .partitions(autoCreateTopicsPartitions)
      .replicationFactor(autoCreateTopicsReplicationFactor)
      .doNotCreateTopics(ImmutableSet.of(
          gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
      ))
      .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
      new TStateCleanupOnTimeout.Factory(
          maxCallDuration,
          postmortemCallDuration,
          globalAppConfig.timerGranularity()
      );

  @Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
  @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
      SourceTopic.GCA_CFG,
      new CfgJsonDeserializationSchema(),
      (event, timestamp) -> event.getBatchId(),
      it -> !it.getHeartbeat());

  if (cfgSource != null) {
    cfgXform = cfgSource
        .keyBy(PbCfgDatum::getCcId)
        .process(new CfgTransform())
        .uid("xform-cfg")
        .name("XForm Config");

    if (!isNullOrEmpty(gspCfg)) {
      cfgXform.addSink(producerFactory.create(gspCfg,
          autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
          .uid("uid-" + gspCfg)
          .name(gspCfg);
    } else {
      cfgXform.addSink(new DiscardingSink<>())
          .uid("uid-gsp-cfg-null")
          .name("gsp-cfg-null");
    }
  } else {
    cfgXform = null;
  }

  final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
      SourceTopic.VOICE_CALL_THREAD,
      callThreadFormat == KafkaTopicFormat.JSON
          ? new TJsonDeserializationSchema()
          : new CallEventDeserializationSchema(),
      (event, timestamp) ->
          Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
              ? timestamp - deviationMillis
              : Instants.toMillis(event.getTimestamp()),
      event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
      .keyBy(CallEventKey::new)
      .process(new TIntakeProcessFunction(cleanupFactory))
      .returns(PbTypes.TCM_DATUM)
      .uid("intake-voice-calls")
      .name("Intake voice calls");

  DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
      tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);

  final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
  final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;

  if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
    final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
        SourceTopic.DIGITAL_ITX,
        new InteractionEventDeserializationSchema(),
        (event, timestamp) -> Instants
            .toMillis(event.getOrder().getTimestamp()),
        event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());

    sortedIxnEventStream = ixnIntake
        .keyBy(DixnEventKey::new)
        .process(new UnwrapEnvelopeAndSort())
        .uid("unwrap-sort")
        .name("Sort Digital Interactions");

    digitalPreTransform = sortedIxnEventStream
        .keyBy(DixnEventKey::new)
        .process(new DIxnPreTransformSplit())
        .uid("split-digital-busy")
        .name("Split Digital busy states");
    allAgentStateEventsDataStream = union(
        allAgentStateEventsDataStream,
        digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
  } else {
    sortedIxnEventStream = null;
    digitalPreTransform = null;
  }

  // --------
  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.VOICE_AGENT_STATE,
          callThreadFormat == KafkaTopicFormat.JSON
              ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.DIGITAL_AGENT_STATE,
          new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  @NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
      .requireNonNullElseGet(
          addSources(see,
              SourceTopic.VOICE_OUTBOUND,
              new OcsJsonDeserializationSchema(),
              this::timestamp,
              event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
          () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
      .keyBy(OcsEventKey::new)
      .process(new OcsIntakeProcessFunction())
      .returns(PbTypes.OCS_DATUM)
      .uid("intake-outbound")
      .name("Intake Outbound");

  final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
  if (allAgentStateEventsDataStream != null) {
    agentStatesIntake = allAgentStateEventsDataStream
        .keyBy(PersonKey::new)
        .process(new AgentStateIntakeProcessFunction())
        .uid("intake-agent-states")
        .name("Intake Agent States");
  } else {
    agentStatesIntake = null;
  }

  @Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
  if (cfgXform != null && agentStatesIntake != null) {
    DataStream<PbPersonQuery> personQueryStream = union(
        agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
        ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
    );

    agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
        .connect(personQueryStream)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new CfgAgentInfoRegistry(globalAppConfig))
        .uid("cfg-agent-info-provider")
        .name("Agent Info");
  } else {
    agentInfo = null;
  }

  final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
  if (agentStatesIntake != null) {
    allAgentLoginSession = agentStatesIntake.connect(agentInfo)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new AgentStateTransformation(
            globalAppConfig,
            glsAcwMode,
            agentStateIdleTimeout))
        .uid("xform-agent-states")
        .name("Xform Agent States");
  } else {
    allAgentLoginSession = see.addSource(new EmptyGenerator<>());
  }

  if (!gspSm.isEmpty()) {
    allAgentLoginSession
        .addSink(producerFactory.create(
            gspSm,
            autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
        .uid("uid-" + gspSm)
        .name(gspSm);
  } else {
    allAgentLoginSession
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-sm-null")
        .name("gsp-sm-null");
  }

  DataStream<Interaction> interactionDataStream = tcmDataStream1
      .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
      .keyBy(CallEventKey::new, CallEventKey::new)
      .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
      .uid("xform-voice-interactions")
      .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
    final DataStream<Interaction> ixnOutputStream = digitalPreTransform
        .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
        .keyBy(DixnEventKey::new, DixnEventKey::new)
        .process(new DixnTransform(globalAppConfig))
        .uid("xform-digital-interactions")
        .name("Xform Digital Interactions");
    interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
      interactionDataStream,
      new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
      cfgCacheTimeout.toMillis(),
      TimeUnit.MILLISECONDS,
      (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
          .getParallelism())
      .uid("uid-config-async-io")
      .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
    interactionDataStream.addSink(
        producerFactory.create(
            gspIxn,
            autoTopic.decorate(new InteractionSerializationSchema(
                outputTopicFormat, gspIxn, producerFactory.config()))))
        .uid("uid-" + gspIxn)
        .name(gspIxn);
  } else {
    interactionDataStream
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-ixn-null")
        .name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
    final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
        .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
    final SingleOutputStreamOperator<CustomFact> customFactsVoice =
        uxEventsVoice
            .keyBy(UxKey::new)
            .process(new UxToCustomFact(globalAppConfig))
            .uid("xform-voice-ux")
            .name("Xform Voice Ux");

    SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
    if (sortedIxnEventStream != null) {
      DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
          .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
      customFactsDigital =
          uxEventsDigital
              .keyBy(UxKey::new)
              .process(new UxToCustomFact(globalAppConfig))
              .uid("xform-digital-ux")
              .name("Xform Digital Ux");
    }

    final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
        : customFactsVoice.union(customFactsDigital);
    customFacts.addSink(
        producerFactory.create(
            globalAppConfig.gspCustom(),
            autoTopic.decorate(
                new CustomFactSerializationSchema(
                    outputTopicFormat, globalAppConfig.gspCustom()))))
        .uid("uid-" + globalAppConfig.gspCustom())
        .name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
    final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
        new OcsStateCleanupOnTimeout.Factory(
            maxCampaignGroupSessionDuration,
            globalAppConfig.timerGranularity()
        );

    @Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
        ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;

    final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
        .connect(ocsAgentDetails)
        .keyBy(OcsEventKey::new, OcsEventKey::new)
        .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
        .uid("xform-outbound")
        .name("Xform Outbound");

    if (!gspOutbound.isEmpty()) {
      outboundDataStream.addSink(
          producerFactory.create(
              gspOutbound,
              autoTopic.decorate(
                  new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
          .uid("uid-" + globalAppConfig.gspOutbound())
          .name(globalAppConfig.gspOutbound());
    } else {
      outboundDataStream
          .addSink(new DiscardingSink<>())
          .uid("uid-gsp-outbound-null")
          .name("gsp-outbound-null");
    }
  }

  TApp.main(see);
  return see;
}

Thanks,
Alexey

From: Yun Gao <[hidden email]>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: How to check checkpointing mode
 
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun

------------------------------------------------------------------
From:Alexey Trenikhun <[hidden email]>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <[hidden email]>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Re: How to check checkpointing mode

Yun Gao
Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of 
see#checkpointCfg#checkpointingMode ?

Best,
Yun

------------------Original Mail ------------------
Sender:Alexey Trenikhun <[hidden email]>
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List <[hidden email]>, Yun Gao <[hidden email]>
Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:

@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
    // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
    // -- after jobs argument, this unnecessary for us arguments will be treated as positional
    // parameters, which we ignore but log warning
    LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
    final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
    see.execute(name);
    return null;
  } catch (InterruptedException e) {
    LOGGER.error("Stream Processor was interrupted", e);
    Thread.currentThread().interrupt();
    throw e;
  } catch (Exception e) {
    LOGGER.error("Stream Processor is terminated due to exception", e);
    throw e;
  }
}

private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
      .getExecutionEnvironment()
      .enableCheckpointing(checkpointInterval.toMillis(),
          CheckpointingMode.AT_LEAST_ONCE)
      .setMaxParallelism(1024)
      .setParallelism(parallelism);
  if (externalizedCheckpoints) {
    see.getCheckpointConfig()
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
      .putAll(kafkaCommonOptions)
      .putAll(kafkaProducerOptions)
      .varFiles(valueFiles)
      .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
      .semantic(Semantic.AT_LEAST_ONCE)
      .config(producerProperties)
      .build();

  final AutoTopic autoTopic = AutoTopic.builder()
      .config(producerProperties)
      .partitions(autoCreateTopicsPartitions)
      .replicationFactor(autoCreateTopicsReplicationFactor)
      .doNotCreateTopics(ImmutableSet.of(
          gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
      ))
      .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
      new TStateCleanupOnTimeout.Factory(
          maxCallDuration,
          postmortemCallDuration,
          globalAppConfig.timerGranularity()
      );

  @Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
  @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
      SourceTopic.GCA_CFG,
      new CfgJsonDeserializationSchema(),
      (event, timestamp) -> event.getBatchId(),
      it -> !it.getHeartbeat());

  if (cfgSource != null) {
    cfgXform = cfgSource
        .keyBy(PbCfgDatum::getCcId)
        .process(new CfgTransform())
        .uid("xform-cfg")
        .name("XForm Config");

    if (!isNullOrEmpty(gspCfg)) {
      cfgXform.addSink(producerFactory.create(gspCfg,
          autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
          .uid("uid-" + gspCfg)
          .name(gspCfg);
    } else {
      cfgXform.addSink(new DiscardingSink<>())
          .uid("uid-gsp-cfg-null")
          .name("gsp-cfg-null");
    }
  } else {
    cfgXform = null;
  }

  final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
      SourceTopic.VOICE_CALL_THREAD,
      callThreadFormat == KafkaTopicFormat.JSON
          ? new TJsonDeserializationSchema()
          : new CallEventDeserializationSchema(),
      (event, timestamp) ->
          Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
              ? timestamp - deviationMillis
              : Instants.toMillis(event.getTimestamp()),
      event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
      .keyBy(CallEventKey::new)
      .process(new TIntakeProcessFunction(cleanupFactory))
      .returns(PbTypes.TCM_DATUM)
      .uid("intake-voice-calls")
      .name("Intake voice calls");

  DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
      tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);

  final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
  final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;

  if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
    final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
        SourceTopic.DIGITAL_ITX,
        new InteractionEventDeserializationSchema(),
        (event, timestamp) -> Instants
            .toMillis(event.getOrder().getTimestamp()),
        event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());

    sortedIxnEventStream = ixnIntake
        .keyBy(DixnEventKey::new)
        .process(new UnwrapEnvelopeAndSort())
        .uid("unwrap-sort")
        .name("Sort Digital Interactions");

    digitalPreTransform = sortedIxnEventStream
        .keyBy(DixnEventKey::new)
        .process(new DIxnPreTransformSplit())
        .uid("split-digital-busy")
        .name("Split Digital busy states");
    allAgentStateEventsDataStream = union(
        allAgentStateEventsDataStream,
        digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
  } else {
    sortedIxnEventStream = null;
    digitalPreTransform = null;
  }

  // --------
  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.VOICE_AGENT_STATE,
          callThreadFormat == KafkaTopicFormat.JSON
              ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.DIGITAL_AGENT_STATE,
          new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  @NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
      .requireNonNullElseGet(
          addSources(see,
              SourceTopic.VOICE_OUTBOUND,
              new OcsJsonDeserializationSchema(),
              this::timestamp,
              event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
          () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
      .keyBy(OcsEventKey::new)
      .process(new OcsIntakeProcessFunction())
      .returns(PbTypes.OCS_DATUM)
      .uid("intake-outbound")
      .name("Intake Outbound");

  final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
  if (allAgentStateEventsDataStream != null) {
    agentStatesIntake = allAgentStateEventsDataStream
        .keyBy(PersonKey::new)
        .process(new AgentStateIntakeProcessFunction())
        .uid("intake-agent-states")
        .name("Intake Agent States");
  } else {
    agentStatesIntake = null;
  }

  @Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
  if (cfgXform != null && agentStatesIntake != null) {
    DataStream<PbPersonQuery> personQueryStream = union(
        agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
        ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
    );

    agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
        .connect(personQueryStream)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new CfgAgentInfoRegistry(globalAppConfig))
        .uid("cfg-agent-info-provider")
        .name("Agent Info");
  } else {
    agentInfo = null;
  }

  final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
  if (agentStatesIntake != null) {
    allAgentLoginSession = agentStatesIntake.connect(agentInfo)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new AgentStateTransformation(
            globalAppConfig,
            glsAcwMode,
            agentStateIdleTimeout))
        .uid("xform-agent-states")
        .name("Xform Agent States");
  } else {
    allAgentLoginSession = see.addSource(new EmptyGenerator<>());
  }

  if (!gspSm.isEmpty()) {
    allAgentLoginSession
        .addSink(producerFactory.create(
            gspSm,
            autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
        .uid("uid-" + gspSm)
        .name(gspSm);
  } else {
    allAgentLoginSession
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-sm-null")
        .name("gsp-sm-null");
  }

  DataStream<Interaction> interactionDataStream = tcmDataStream1
      .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
      .keyBy(CallEventKey::new, CallEventKey::new)
      .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
      .uid("xform-voice-interactions")
      .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
    final DataStream<Interaction> ixnOutputStream = digitalPreTransform
        .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
        .keyBy(DixnEventKey::new, DixnEventKey::new)
        .process(new DixnTransform(globalAppConfig))
        .uid("xform-digital-interactions")
        .name("Xform Digital Interactions");
    interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
      interactionDataStream,
      new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
      cfgCacheTimeout.toMillis(),
      TimeUnit.MILLISECONDS,
      (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
          .getParallelism())
      .uid("uid-config-async-io")
      .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
    interactionDataStream.addSink(
        producerFactory.create(
            gspIxn,
            autoTopic.decorate(new InteractionSerializationSchema(
                outputTopicFormat, gspIxn, producerFactory.config()))))
        .uid("uid-" + gspIxn)
        .name(gspIxn);
  } else {
    interactionDataStream
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-ixn-null")
        .name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
    final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
        .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
    final SingleOutputStreamOperator<CustomFact> customFactsVoice =
        uxEventsVoice
            .keyBy(UxKey::new)
            .process(new UxToCustomFact(globalAppConfig))
            .uid("xform-voice-ux")
            .name("Xform Voice Ux");

    SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
    if (sortedIxnEventStream != null) {
      DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
          .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
      customFactsDigital =
          uxEventsDigital
              .keyBy(UxKey::new)
              .process(new UxToCustomFact(globalAppConfig))
              .uid("xform-digital-ux")
              .name("Xform Digital Ux");
    }

    final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
        : customFactsVoice.union(customFactsDigital);
    customFacts.addSink(
        producerFactory.create(
            globalAppConfig.gspCustom(),
            autoTopic.decorate(
                new CustomFactSerializationSchema(
                    outputTopicFormat, globalAppConfig.gspCustom()))))
        .uid("uid-" + globalAppConfig.gspCustom())
        .name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
    final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
        new OcsStateCleanupOnTimeout.Factory(
            maxCampaignGroupSessionDuration,
            globalAppConfig.timerGranularity()
        );

    @Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
        ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;

    final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
        .connect(ocsAgentDetails)
        .keyBy(OcsEventKey::new, OcsEventKey::new)
        .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
        .uid("xform-outbound")
        .name("Xform Outbound");

    if (!gspOutbound.isEmpty()) {
      outboundDataStream.addSink(
          producerFactory.create(
              gspOutbound,
              autoTopic.decorate(
                  new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
          .uid("uid-" + globalAppConfig.gspOutbound())
          .name(globalAppConfig.gspOutbound());
    } else {
      outboundDataStream
          .addSink(new DiscardingSink<>())
          .uid("uid-gsp-outbound-null")
          .name("gsp-outbound-null");
    }
  }

  TApp.main(see);
  return see;
}

Thanks,
Alexey

From: Yun Gao <[hidden email]>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: How to check checkpointing mode
 
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun

------------------------------------------------------------------
From:Alexey Trenikhun <[hidden email]>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <[hidden email]>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Re: How to check checkpointing mode

Alexey Trenikhun
Hi Yun,
It is confusing but UI now shows expected value "At Least Once" (obviously checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've either looked in wrong place or job was not upgraded when I changed checkpointing mode ... 

Sorry for noise and thank you for your help

Alexey


From: Yun Gao <[hidden email]>
Sent: Monday, March 8, 2021 7:14 PM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Re: How to check checkpointing mode
 
Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of 
see#checkpointCfg#checkpointingMode ?

Best,
Yun

------------------Original Mail ------------------
Sender:Alexey Trenikhun <[hidden email]>
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List <[hidden email]>, Yun Gao <[hidden email]>
Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:

@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
    // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add
    // -- after jobs argument, this unnecessary for us arguments will be treated as positional
    // parameters, which we ignore but log warning
    LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
    final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
    see.execute(name);
    return null;
  } catch (InterruptedException e) {
    LOGGER.error("Stream Processor was interrupted", e);
    Thread.currentThread().interrupt();
    throw e;
  } catch (Exception e) {
    LOGGER.error("Stream Processor is terminated due to exception", e);
    throw e;
  }
}

private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
      .getExecutionEnvironment()
      .enableCheckpointing(checkpointInterval.toMillis(),
          CheckpointingMode.AT_LEAST_ONCE)
      .setMaxParallelism(1024)
      .setParallelism(parallelism);
  if (externalizedCheckpoints) {
    see.getCheckpointConfig()
        .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
      .putAll(kafkaCommonOptions)
      .putAll(kafkaProducerOptions)
      .varFiles(valueFiles)
      .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
      .semantic(Semantic.AT_LEAST_ONCE)
      .config(producerProperties)
      .build();

  final AutoTopic autoTopic = AutoTopic.builder()
      .config(producerProperties)
      .partitions(autoCreateTopicsPartitions)
      .replicationFactor(autoCreateTopicsReplicationFactor)
      .doNotCreateTopics(ImmutableSet.of(
          gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
      ))
      .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
      new TStateCleanupOnTimeout.Factory(
          maxCallDuration,
          postmortemCallDuration,
          globalAppConfig.timerGranularity()
      );

  @Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform;
  @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see,
      SourceTopic.GCA_CFG,
      new CfgJsonDeserializationSchema(),
      (event, timestamp) -> event.getBatchId(),
      it -> !it.getHeartbeat());

  if (cfgSource != null) {
    cfgXform = cfgSource
        .keyBy(PbCfgDatum::getCcId)
        .process(new CfgTransform())
        .uid("xform-cfg")
        .name("XForm Config");

    if (!isNullOrEmpty(gspCfg)) {
      cfgXform.addSink(producerFactory.create(gspCfg,
          autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg))))
          .uid("uid-" + gspCfg)
          .name(gspCfg);
    } else {
      cfgXform.addSink(new DiscardingSink<>())
          .uid("uid-gsp-cfg-null")
          .name("gsp-cfg-null");
    }
  } else {
    cfgXform = null;
  }

  final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see,
      SourceTopic.VOICE_CALL_THREAD,
      callThreadFormat == KafkaTopicFormat.JSON
          ? new TJsonDeserializationSchema()
          : new CallEventDeserializationSchema(),
      (event, timestamp) ->
          Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp())
              ? timestamp - deviationMillis
              : Instants.toMillis(event.getTimestamp()),
      event -> event.getType() != EventType.EVENT_UNKNOWN);

  final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource
      .keyBy(CallEventKey::new)
      .process(new TIntakeProcessFunction(cleanupFactory))
      .returns(PbTypes.TCM_DATUM)
      .uid("intake-voice-calls")
      .name("Intake voice calls");

  DataStream<PbAgentStateDatum> allAgentStateEventsDataStream =
      tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);

  final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream;
  final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;

  if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) {
    final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see,
        SourceTopic.DIGITAL_ITX,
        new InteractionEventDeserializationSchema(),
        (event, timestamp) -> Instants
            .toMillis(event.getOrder().getTimestamp()),
        event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());

    sortedIxnEventStream = ixnIntake
        .keyBy(DixnEventKey::new)
        .process(new UnwrapEnvelopeAndSort())
        .uid("unwrap-sort")
        .name("Sort Digital Interactions");

    digitalPreTransform = sortedIxnEventStream
        .keyBy(DixnEventKey::new)
        .process(new DIxnPreTransformSplit())
        .uid("split-digital-busy")
        .name("Split Digital busy states");
    allAgentStateEventsDataStream = union(
        allAgentStateEventsDataStream,
        digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG));
  } else {
    sortedIxnEventStream = null;
    digitalPreTransform = null;
  }

  // --------
  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.VOICE_AGENT_STATE,
          callThreadFormat == KafkaTopicFormat.JSON
              ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  allAgentStateEventsDataStream = union(allAgentStateEventsDataStream,
      addSources(see,
          SourceTopic.DIGITAL_AGENT_STATE,
          new AsdAvroDeserializationSchema(),
          this::timestamp,
          event -> event.getId() != Id.UNKNOWN));

  @NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables
      .requireNonNullElseGet(
          addSources(see,
              SourceTopic.VOICE_OUTBOUND,
              new OcsJsonDeserializationSchema(),
              this::timestamp,
              event -> event.getEventCode() != EventCode.EVENT_OCS_NONE),
          () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX)))
      .keyBy(OcsEventKey::new)
      .process(new OcsIntakeProcessFunction())
      .returns(PbTypes.OCS_DATUM)
      .uid("intake-outbound")
      .name("Intake Outbound");

  final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake;
  if (allAgentStateEventsDataStream != null) {
    agentStatesIntake = allAgentStateEventsDataStream
        .keyBy(PersonKey::new)
        .process(new AgentStateIntakeProcessFunction())
        .uid("intake-agent-states")
        .name("Intake Agent States");
  } else {
    agentStatesIntake = null;
  }

  @Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo;
  if (cfgXform != null && agentStatesIntake != null) {
    DataStream<PbPersonQuery> personQueryStream = union(
        agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG),
        ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG)
    );

    agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG)
        .connect(personQueryStream)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new CfgAgentInfoRegistry(globalAppConfig))
        .uid("cfg-agent-info-provider")
        .name("Agent Info");
  } else {
    agentInfo = null;
  }

  final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession;
  if (agentStatesIntake != null) {
    allAgentLoginSession = agentStatesIntake.connect(agentInfo)
        .keyBy(PersonKey::new, PersonKey::new)
        .process(new AgentStateTransformation(
            globalAppConfig,
            glsAcwMode,
            agentStateIdleTimeout))
        .uid("xform-agent-states")
        .name("Xform Agent States");
  } else {
    allAgentLoginSession = see.addSource(new EmptyGenerator<>());
  }

  if (!gspSm.isEmpty()) {
    allAgentLoginSession
        .addSink(producerFactory.create(
            gspSm,
            autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm))))
        .uid("uid-" + gspSm)
        .name(gspSm);
  } else {
    allAgentLoginSession
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-sm-null")
        .name("gsp-sm-null");
  }

  DataStream<Interaction> interactionDataStream = tcmDataStream1
      .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW))
      .keyBy(CallEventKey::new, CallEventKey::new)
      .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory))
      .uid("xform-voice-interactions")
      .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
    final DataStream<Interaction> ixnOutputStream = digitalPreTransform
        .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
        .keyBy(DixnEventKey::new, DixnEventKey::new)
        .process(new DixnTransform(globalAppConfig))
        .uid("xform-digital-interactions")
        .name("Xform Digital Interactions");
    interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
      interactionDataStream,
      new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
      cfgCacheTimeout.toMillis(),
      TimeUnit.MILLISECONDS,
      (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig()
          .getParallelism())
      .uid("uid-config-async-io")
      .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
    interactionDataStream.addSink(
        producerFactory.create(
            gspIxn,
            autoTopic.decorate(new InteractionSerializationSchema(
                outputTopicFormat, gspIxn, producerFactory.config()))))
        .uid("uid-" + gspIxn)
        .name(gspIxn);
  } else {
    interactionDataStream
        .addSink(new DiscardingSink<>())
        .uid("uid-gsp-ixn-null")
        .name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
    final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1
        .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
    final SingleOutputStreamOperator<CustomFact> customFactsVoice =
        uxEventsVoice
            .keyBy(UxKey::new)
            .process(new UxToCustomFact(globalAppConfig))
            .uid("xform-voice-ux")
            .name("Xform Voice Ux");

    SingleOutputStreamOperator<CustomFact> customFactsDigital = null;
    if (sortedIxnEventStream != null) {
      DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream
          .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
      customFactsDigital =
          uxEventsDigital
              .keyBy(UxKey::new)
              .process(new UxToCustomFact(globalAppConfig))
              .uid("xform-digital-ux")
              .name("Xform Digital Ux");
    }

    final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice
        : customFactsVoice.union(customFactsDigital);
    customFacts.addSink(
        producerFactory.create(
            globalAppConfig.gspCustom(),
            autoTopic.decorate(
                new CustomFactSerializationSchema(
                    outputTopicFormat, globalAppConfig.gspCustom()))))
        .uid("uid-" + globalAppConfig.gspCustom())
        .name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
    final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
        new OcsStateCleanupOnTimeout.Factory(
            maxCampaignGroupSessionDuration,
            globalAppConfig.timerGranularity()
        );

    @Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null
        ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;

    final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream
        .connect(ocsAgentDetails)
        .keyBy(OcsEventKey::new, OcsEventKey::new)
        .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory))
        .uid("xform-outbound")
        .name("Xform Outbound");

    if (!gspOutbound.isEmpty()) {
      outboundDataStream.addSink(
          producerFactory.create(
              gspOutbound,
              autoTopic.decorate(
                  new OutboundSerializationSchema(outputTopicFormat, gspOutbound))))
          .uid("uid-" + globalAppConfig.gspOutbound())
          .name(globalAppConfig.gspOutbound());
    } else {
      outboundDataStream
          .addSink(new DiscardingSink<>())
          .uid("uid-gsp-outbound-null")
          .name("gsp-outbound-null");
    }
  }

  TApp.main(see);
  return see;
}

Thanks,
Alexey

From: Yun Gao <[hidden email]>
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: How to check checkpointing mode
 
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun

------------------------------------------------------------------
From:Alexey Trenikhun <[hidden email]>
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List <[hidden email]>
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
    .getExecutionEnvironment()
    .enableCheckpointing(checkpointInterval.toMillis(),
        CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?

Thanks,
Alexey