|
Hi Yun,
It is confusing but UI now shows expected value "At Least Once" (obviously checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've either looked in wrong place or job was not upgraded when I changed checkpointing mode ...
Sorry for noise and thank you for your help
Alexey
Hi Alexey,
Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of
see#checkpointCfg#checkpointingMode ?
Best,
Yun
------------------Original Mail ------------------
Send Date:Tue Mar 9 07:25:31 2021
Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods:
@Override
public Void call() throws Exception { LOGGER.info("{}", new Info().toLog());
if (!allParameters.isEmpty()) { // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add // -- after jobs argument, this unnecessary for us arguments will be treated as positional // parameters, which we ignore but log warning LOGGER.warn("Unexpected parameters: {}", allParameters); } try { final StreamExecutionEnvironment see = buildStreamExecutionEnvironment(); see.execute(name); return null; } catch (InterruptedException e) { LOGGER.error("Stream Processor was interrupted", e); Thread.currentThread().interrupt(); throw e; } catch (Exception e) { LOGGER.error("Stream Processor is terminated due to exception", e); throw e; } }
private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException { initDefaultKafkaSource(); final long deviationMillis = deviation.toMillis(); final GlobalAppConfig globalAppConfig = config(); final StreamExecutionEnvironment see = StreamExecutionEnvironment .getExecutionEnvironment() .enableCheckpointing(checkpointInterval.toMillis(), CheckpointingMode.AT_LEAST_ONCE) .setMaxParallelism(1024) .setParallelism(parallelism); if (externalizedCheckpoints) { see.getCheckpointConfig() .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } see.getConfig().disableGenericTypes(); see.getConfig().disableAutoGeneratedUIDs(); configureStateBackend(see);
final Properties producerProperties = new PropertiesBuilder() .putAll(kafkaCommonOptions) .putAll(kafkaProducerOptions) .varFiles(valueFiles) .build();
final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder() .semantic(Semantic.AT_LEAST_ONCE) .config(producerProperties) .build();
final AutoTopic autoTopic = AutoTopic.builder() .config(producerProperties) .partitions(autoCreateTopicsPartitions) .replicationFactor(autoCreateTopicsReplicationFactor) .doNotCreateTopics(ImmutableSet.of( gspCfg, gspCustom, gspIxn, gspOutbound, gspSm )) .build();
see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1))); // since Flink 1.12 default stream characteristic is event time, // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum // is deprecated. // If needed explicitly using processing-time windows and timers works in event-time mode.
addHeartbeats(see); final TStateCleanupOnTimeout.Factory cleanupFactory = new TStateCleanupOnTimeout.Factory( maxCallDuration, postmortemCallDuration, globalAppConfig.timerGranularity() );
@Nullable final SingleOutputStreamOperator<PbCfgDatum> cfgXform; @Nullable final DataStream<PbCfgDatum> cfgSource = addSources(see, SourceTopic.GCA_CFG, new CfgJsonDeserializationSchema(), (event, timestamp) -> event.getBatchId(), it -> !it.getHeartbeat());
if (cfgSource != null) { cfgXform = cfgSource .keyBy(PbCfgDatum::getCcId) .process(new CfgTransform()) .uid("xform-cfg") .name("XForm Config");
if (!isNullOrEmpty(gspCfg)) { cfgXform.addSink(producerFactory.create(gspCfg, autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg)))) .uid("uid-" + gspCfg) .name(gspCfg); } else { cfgXform.addSink(new DiscardingSink<>()) .uid("uid-gsp-cfg-null") .name("gsp-cfg-null"); } } else { cfgXform = null; }
final DataStream<PbTcmDatum> voiceCallThreadSource = addSources(see, SourceTopic.VOICE_CALL_THREAD, callThreadFormat == KafkaTopicFormat.JSON ? new TJsonDeserializationSchema() : new CallEventDeserializationSchema(), (event, timestamp) -> Instants.PROTO_TIMESTAMP_EPOCH.equals(event.getTimestamp()) ? timestamp - deviationMillis : Instants.toMillis(event.getTimestamp()), event -> event.getType() != EventType.EVENT_UNKNOWN);
final SingleOutputStreamOperator<PbTcmDatum> tcmDataStream1 = voiceCallThreadSource .keyBy(CallEventKey::new) .process(new TIntakeProcessFunction(cleanupFactory)) .returns(PbTypes.TCM_DATUM) .uid("intake-voice-calls") .name("Intake voice calls");
DataStream<PbAgentStateDatum> allAgentStateEventsDataStream = tcmDataStream1.getSideOutput(TIntakeProcessFunction.AGENT_STATE_EVENTS_TAG);
final SingleOutputStreamOperator<PbDIxnDatum> sortedIxnEventStream; final SingleOutputStreamOperator<PbDIxnDatum> digitalPreTransform;
if (!isNullOrEmpty(digitalItx) && !isNullOrEmpty(digitalAgentStateTopic)) { final DataStream<PbDIxnEnvelope> ixnIntake = addSources(see, SourceTopic.DIGITAL_ITX, new InteractionEventDeserializationSchema(), (event, timestamp) -> Instants .toMillis(event.getOrder().getTimestamp()), event -> event.getId() != InteractionEventMessageName.EventHeartbeat.ordinal());
sortedIxnEventStream = ixnIntake .keyBy(DixnEventKey::new) .process(new UnwrapEnvelopeAndSort()) .uid("unwrap-sort") .name("Sort Digital Interactions");
digitalPreTransform = sortedIxnEventStream .keyBy(DixnEventKey::new) .process(new DIxnPreTransformSplit()) .uid("split-digital-busy") .name("Split Digital busy states"); allAgentStateEventsDataStream = union( allAgentStateEventsDataStream, digitalPreTransform.getSideOutput(DIxnPreTransformSplit.AGENT_STATE_EVENTS_TAG)); } else { sortedIxnEventStream = null; digitalPreTransform = null; }
// -------- allAgentStateEventsDataStream = union(allAgentStateEventsDataStream, addSources(see, SourceTopic.VOICE_AGENT_STATE, callThreadFormat == KafkaTopicFormat.JSON ? new AsdJsonDeserializationSchema() : new AsdAvroDeserializationSchema(), this::timestamp, event -> event.getId() != Id.UNKNOWN));
allAgentStateEventsDataStream = union(allAgentStateEventsDataStream, addSources(see, SourceTopic.DIGITAL_AGENT_STATE, new AsdAvroDeserializationSchema(), this::timestamp, event -> event.getId() != Id.UNKNOWN));
@NotNull final SingleOutputStreamOperator<PbOcsDatum> ocsDataStream = Nullables .requireNonNullElseGet( addSources(see, SourceTopic.VOICE_OUTBOUND, new OcsJsonDeserializationSchema(), this::timestamp, event -> event.getEventCode() != EventCode.EVENT_OCS_NONE), () -> see.addSource(new EmptyGenerator<PbOcsDatum>(Instant.MAX))) .keyBy(OcsEventKey::new) .process(new OcsIntakeProcessFunction()) .returns(PbTypes.OCS_DATUM) .uid("intake-outbound") .name("Intake Outbound");
final SingleOutputStreamOperator<PbAgentStateDatum> agentStatesIntake; if (allAgentStateEventsDataStream != null) { agentStatesIntake = allAgentStateEventsDataStream .keyBy(PersonKey::new) .process(new AgentStateIntakeProcessFunction()) .uid("intake-agent-states") .name("Intake Agent States"); } else { agentStatesIntake = null; }
@Nullable final SingleOutputStreamOperator<PbAsdSubjectInfo> agentInfo; if (cfgXform != null && agentStatesIntake != null) { DataStream<PbPersonQuery> personQueryStream = union( agentStatesIntake.getSideOutput(AgentStateIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG), ocsDataStream.getSideOutput(OcsIntakeProcessFunction.PERSON_QUERY_OUTPUT_TAG) );
agentInfo = cfgXform.getSideOutput(CfgTransform.PERSON_UPDATE_OUTPUT_TAG) .connect(personQueryStream) .keyBy(PersonKey::new, PersonKey::new) .process(new CfgAgentInfoRegistry(globalAppConfig)) .uid("cfg-agent-info-provider") .name("Agent Info"); } else { agentInfo = null; }
final SingleOutputStreamOperator<AgentLoginSession> allAgentLoginSession; if (agentStatesIntake != null) { allAgentLoginSession = agentStatesIntake.connect(agentInfo) .keyBy(PersonKey::new, PersonKey::new) .process(new AgentStateTransformation( globalAppConfig, glsAcwMode, agentStateIdleTimeout)) .uid("xform-agent-states") .name("Xform Agent States"); } else { allAgentLoginSession = see.addSource(new EmptyGenerator<>()); }
if (!gspSm.isEmpty()) { allAgentLoginSession .addSink(producerFactory.create( gspSm, autoTopic.decorate(new AgentStatesSerializationSchema(outputTopicFormat, gspSm)))) .uid("uid-" + gspSm) .name(gspSm); } else { allAgentLoginSession .addSink(new DiscardingSink<>()) .uid("uid-gsp-sm-null") .name("gsp-sm-null"); }
DataStream<Interaction> interactionDataStream = tcmDataStream1 .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.VOICE_IRF_ACW)) .keyBy(CallEventKey::new, CallEventKey::new) .process(new TInteractionsProcessFunction(globalAppConfig, cleanupFactory)) .uid("xform-voice-interactions") .name("Xform Voice interactions");
if (digitalPreTransform != null) { final DataStream<Interaction> ixnOutputStream = digitalPreTransform .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF)) .keyBy(DixnEventKey::new, DixnEventKey::new) .process(new DixnTransform(globalAppConfig)) .uid("xform-digital-interactions") .name("Xform Digital Interactions"); interactionDataStream = interactionDataStream.union(ixnOutputStream); }
interactionDataStream = AsyncDataStream.orderedWait( interactionDataStream, new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()), cfgCacheTimeout.toMillis(), TimeUnit.MILLISECONDS, (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig() .getParallelism()) .uid("uid-config-async-io") .name("Enrich interactions with config");
if (!gspIxn.isEmpty()) { interactionDataStream.addSink( producerFactory.create( gspIxn, autoTopic.decorate(new InteractionSerializationSchema( outputTopicFormat, gspIxn, producerFactory.config())))) .uid("uid-" + gspIxn) .name(gspIxn); } else { interactionDataStream .addSink(new DiscardingSink<>()) .uid("uid-gsp-ixn-null") .name("gsp-ixn-null"); }
if (!isNullOrEmpty(gspCustom)) { final DataStream<PbUxDatum> uxEventsVoice = tcmDataStream1 .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG); final SingleOutputStreamOperator<CustomFact> customFactsVoice = uxEventsVoice .keyBy(UxKey::new) .process(new UxToCustomFact(globalAppConfig)) .uid("xform-voice-ux") .name("Xform Voice Ux");
SingleOutputStreamOperator<CustomFact> customFactsDigital = null; if (sortedIxnEventStream != null) { DataStream<PbUxDatum> uxEventsDigital = sortedIxnEventStream .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG); customFactsDigital = uxEventsDigital .keyBy(UxKey::new) .process(new UxToCustomFact(globalAppConfig)) .uid("xform-digital-ux") .name("Xform Digital Ux"); }
final DataStream<CustomFact> customFacts = customFactsDigital == null ? customFactsVoice : customFactsVoice.union(customFactsDigital); customFacts.addSink( producerFactory.create( globalAppConfig.gspCustom(), autoTopic.decorate( new CustomFactSerializationSchema( outputTopicFormat, globalAppConfig.gspCustom())))) .uid("uid-" + globalAppConfig.gspCustom()) .name(globalAppConfig.gspCustom()); }
if (!isNullOrEmpty(voiceOutbound)) { final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory = new OcsStateCleanupOnTimeout.Factory( maxCampaignGroupSessionDuration, globalAppConfig.timerGranularity() );
@Nullable final DataStream<PbCafSubjectInfo> ocsAgentDetails = agentInfo != null ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null;
final SingleOutputStreamOperator<CampaignGroupSessionFact> outboundDataStream = ocsDataStream .connect(ocsAgentDetails) .keyBy(OcsEventKey::new, OcsEventKey::new) .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory)) .uid("xform-outbound") .name("Xform Outbound");
if (!gspOutbound.isEmpty()) { outboundDataStream.addSink( producerFactory.create( gspOutbound, autoTopic.decorate( new OutboundSerializationSchema(outputTopicFormat, gspOutbound)))) .uid("uid-" + globalAppConfig.gspOutbound()) .name(globalAppConfig.gspOutbound()); } else { outboundDataStream .addSink(new DiscardingSink<>()) .uid("uid-gsp-outbound-null") .name("gsp-outbound-null"); } }
TApp.main(see); return see; }
Thanks,
Alexey
Hi Alexey,
Logically the setting in the code is of the highest
priority.
Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.
Best,
Yun
------------------------------------------------------------------
Send Time:2021 Mar. 6 (Sat.) 01:02
Subject:How to check checkpointing mode
Hello,
My job sets checkpointing mode to at-least-once:
StreamExecutionEnvironment .getExecutionEnvironment() .enableCheckpointing(checkpointInterval.toMillis(), CheckpointingMode.AT_LEAST_ONCE)
but Flink UI shows Checkpointing Mode: Exactly Once:
Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is?
Thanks,
Alexey
|