Async + Broadcast?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Async + Broadcast?

Alex Cruise
Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot whenever anything has changed, I've set up a broadcast of that source, and I'm updating the operators in the data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've reached the first async operator, and I recall that async functions aren't allowed to be stateful.

I've tried to find a best practice for this situation, without much luck. My best idea so far is to insert a new stage before the async one, which would tuple up each record with its corresponding config snapshot from the most recent broadcast state. This would increase the amount of data that needs to be serialized, and some of the configs are quite large, but would allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: Async + Broadcast?

austin.ce
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb) and our pipeline not huge (~1M records/day and 1GB data/ day), but this setup worked quite well. I'd say if latency isn't your most important metric, or if your pipeline is a similar size, the ease of async IO is worth it.

One thing you'll have to look out for (if you haven't already) is bootstrapping the config objects when the job starts, since the broadcast from the polling source can happen later than recieving the first record – we solved this by calling the polling source's service in the `open()` method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <[hidden email]> wrote:
Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot whenever anything has changed, I've set up a broadcast of that source, and I'm updating the operators in the data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've reached the first async operator, and I recall that async functions aren't allowed to be stateful.

I've tried to find a best practice for this situation, without much luck. My best idea so far is to insert a new stage before the async one, which would tuple up each record with its corresponding config snapshot from the most recent broadcast state. This would increase the amount of data that needs to be serialized, and some of the configs are quite large, but would allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: Async + Broadcast?

Alex Cruise
Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)

-0xe1a

On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb) and our pipeline not huge (~1M records/day and 1GB data/ day), but this setup worked quite well. I'd say if latency isn't your most important metric, or if your pipeline is a similar size, the ease of async IO is worth it.

One thing you'll have to look out for (if you haven't already) is bootstrapping the config objects when the job starts, since the broadcast from the polling source can happen later than recieving the first record – we solved this by calling the polling source's service in the `open()` method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <[hidden email]> wrote:
Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot whenever anything has changed, I've set up a broadcast of that source, and I'm updating the operators in the data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've reached the first async operator, and I recall that async functions aren't allowed to be stateful.

I've tried to find a best practice for this situation, without much luck. My best idea so far is to insert a new stage before the async one, which would tuple up each record with its corresponding config snapshot from the most recent broadcast state. This would increase the amount of data that needs to be serialized, and some of the configs are quite large, but would allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: Async + Broadcast?

Arvid Heise-4
Hi Alex,

your approach is completely valid. What you want to achieve is that you have a chain between your state managing operator and the consuming async operations. In that way, you have no serialization overhead.

To achieve that you want to
- use Flink 1.11+ [1]
- make sure that if you have a legacy source, you disableChaining before your state managing operator as asyncIO cannot be (transitively) chained to legacy sources. So it should be source -> ... -> (forward channel) -> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
- enableObjectReuse [2] to avoid copying of objects


On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <[hidden email]> wrote:
Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)

-0xe1a

On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb) and our pipeline not huge (~1M records/day and 1GB data/ day), but this setup worked quite well. I'd say if latency isn't your most important metric, or if your pipeline is a similar size, the ease of async IO is worth it.

One thing you'll have to look out for (if you haven't already) is bootstrapping the config objects when the job starts, since the broadcast from the polling source can happen later than recieving the first record – we solved this by calling the polling source's service in the `open()` method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <[hidden email]> wrote:
Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot whenever anything has changed, I've set up a broadcast of that source, and I'm updating the operators in the data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've reached the first async operator, and I recall that async functions aren't allowed to be stateful.

I've tried to find a best practice for this situation, without much luck. My best idea so far is to insert a new stage before the async one, which would tuple up each record with its corresponding config snapshot from the most recent broadcast state. This would increase the amount of data that needs to be serialized, and some of the configs are quite large, but would allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: Async + Broadcast?

Alex Cruise
Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just the one immediately preceding the async?

Thanks!

-0xe1a

Types:

/** all the configs for a given tenant, as of the time when a change was observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map<UUID, Config>
)

/** parse raw strings from input, rejecting those for unconfigured tenants */
class Parse(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<String, ConfigSnapshot, Record> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Record>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(value: String, ctx: ReadOnlyContext, out: Collector<Record>) {
    val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .keys
      .ifEmpty { initialConfigs.keys }

    val parsed = Record(value)
    if (!validTenantIds.contains(parsed.tenantId)) {
      return
    } else {
      out.collect(parsed)
    }
  }
}

/** given a parsed record, identity which config(s) are interested in it, and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Pair<Record, Config>>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(
    value: Record,
    ctx: ReadOnlyContext,
    out: Collector<Pair<Record, Config>>
  ) {
    val configsForThisTenant = ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .ifEmpty { initialConfigs }
      .get(value.tenantId)
      .configs
      .orEmpty()

    val configsInterestedInThisRecord = configsForThisTenant.values.filter {
      it.interestedIn(value)
    }    
     
    for ((configId, config) in configsInterestedInThisRecord) {
      out.collect(value to config)
    }
  }
}

/** given a pair of Record and Config, run the async operation and send an enriched record including the result */
class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord>

Job Pseudocode:

val initialConfigs: Map<Long, ConfigSnapshot> = ???
val dataSource: DataStream<String> = ???
val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while (true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a Map<tenantId: Long, ConfigSnapshot>
val configSnapshotDescriptor = MapStateDescriptor(
  "currentConfigSnapshots",
  Long::class.java,
  ConfigSnapshot::class.java
)

// Broadcast the snapshots
val configBroadcast: BroadcastStream<ConfigSnapshot> = configSource.broadcast(configSnapshotDescriptor)

val parsed: DataStream<Record> = dataSource
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

// input records can be duplicated now, as there may be multiple Configs that are interested in a record
val validated: DataStream<Pair<Record, Config>> = parsed
  .connect(configBroadcast)
  .process(ValidateAndDistribute(initialConfigs))

val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait(
  validated,
  Enrich(),
  5L,
  TimeUnit.SECONDS
)
 




On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <[hidden email]> wrote:
Hi Alex,

your approach is completely valid. What you want to achieve is that you have a chain between your state managing operator and the consuming async operations. In that way, you have no serialization overhead.

To achieve that you want to
- use Flink 1.11+ [1]
- make sure that if you have a legacy source, you disableChaining before your state managing operator as asyncIO cannot be (transitively) chained to legacy sources. So it should be source -> ... -> (forward channel) -> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
- enableObjectReuse [2] to avoid copying of objects


On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <[hidden email]> wrote:
Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)

-0xe1a

On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb) and our pipeline not huge (~1M records/day and 1GB data/ day), but this setup worked quite well. I'd say if latency isn't your most important metric, or if your pipeline is a similar size, the ease of async IO is worth it.

One thing you'll have to look out for (if you haven't already) is bootstrapping the config objects when the job starts, since the broadcast from the polling source can happen later than recieving the first record – we solved this by calling the polling source's service in the `open()` method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <[hidden email]> wrote:
Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot whenever anything has changed, I've set up a broadcast of that source, and I'm updating the operators in the data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've reached the first async operator, and I recall that async functions aren't allowed to be stateful.

I've tried to find a best practice for this situation, without much luck. My best idea so far is to insert a new stage before the async one, which would tuple up each record with its corresponding config snapshot from the most recent broadcast state. This would increase the amount of data that needs to be serialized, and some of the configs are quite large, but would allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: Async + Broadcast?

Arvid Heise-4
Hi Alex,

The easiest way to verify if what you tried is working out is to look at Flink's Web UI and check the topology.

The broadcast side of the input will always be ... well broadcasted (=not chained). So you need to disable chaining only on the non-broadcasted dataset.

val parsed: DataStream<Record> = dataSource
  .disableChaining()
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

Regarding objectReuse, it's safe to enable if you don't do any dirty hacks on data that has been output already. So what you cannot do is, store the last element in your map function (without managed state) and use that to calculate the new result.

On Fri, Apr 9, 2021 at 1:13 AM Alex Cruise <[hidden email]> wrote:
Thanks Arvid! I'm not completely clear on where to apply your suggestions.

I've included a sketch of my job below, and I have a couple questions:

1. It looks like enableObjectReuse() is a global setting, should I worry about whether I'm using any mutable data between stages?
2. Should I disableChaining() on BOTH broadcast-dependent stages, or just the one immediately preceding the async?

Thanks!

-0xe1a

Types:

/** all the configs for a given tenant, as of the time when a change was observed */
data class ConfigSnapshot(
  tenantId: Long,
  timestamp: Instant,
  configs: Map<UUID, Config>
)

/** parse raw strings from input, rejecting those for unconfigured tenants */
class Parse(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<String, ConfigSnapshot, Record> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Record>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(value: String, ctx: ReadOnlyContext, out: Collector<Record>) {
    val validTenantIds = ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .keys
      .ifEmpty { initialConfigs.keys }

    val parsed = Record(value)
    if (!validTenantIds.contains(parsed.tenantId)) {
      return
    } else {
      out.collect(parsed)
    }
  }
}

/** given a parsed record, identity which config(s) are interested in it, and send an output value of the record tupled with the interested config */
class ValidateAndDistribute(
  initialConfigs: Map<Long, ConfigSnapshot>
) : BroadcastProcessFunction<Record, ConfigSnapshot, Pair<Record, Config>> {
  override fun processBroadcastElement(
    value: ConfigSnapshot,
    ctx: Context,
    out: Collector<Pair<Record, Config>>
  ) {
    val snapshots = ctx.getBroadcastState(configSnapshotDescriptor)
    snapshots.put(value.tenantId, value)
  }

  override fun processElement(
    value: Record,
    ctx: ReadOnlyContext,
    out: Collector<Pair<Record, Config>>
  ) {
    val configsForThisTenant = ctx.getBroadcastState(configSnapshotDescriptor)
      .toMap()
      .ifEmpty { initialConfigs }
      .get(value.tenantId)
      .configs
      .orEmpty()

    val configsInterestedInThisRecord = configsForThisTenant.values.filter {
      it.interestedIn(value)
    }    
     
    for ((configId, config) in configsInterestedInThisRecord) {
      out.collect(value to config)
    }
  }
}

/** given a pair of Record and Config, run the async operation and send an enriched record including the result */
class Enrich() : RichAsyncFunction<Pair<Record, Config>, EnrichedRecord>

Job Pseudocode:

val initialConfigs: Map<Long, ConfigSnapshot> = ???
val dataSource: DataStream<String> = ???
val configSource: DataStream<ConfigSnapshot> = ??? // from a legacy "while (true) { poll; sleep }" source

// the config-subscribing operators keep the broadcast state in a Map<tenantId: Long, ConfigSnapshot>
val configSnapshotDescriptor = MapStateDescriptor(
  "currentConfigSnapshots",
  Long::class.java,
  ConfigSnapshot::class.java
)

// Broadcast the snapshots
val configBroadcast: BroadcastStream<ConfigSnapshot> = configSource.broadcast(configSnapshotDescriptor)

val parsed: DataStream<Record> = dataSource
  .connect(configBroadcast)
  .process(Parse(initialConfigs))

// input records can be duplicated now, as there may be multiple Configs that are interested in a record
val validated: DataStream<Pair<Record, Config>> = parsed
  .connect(configBroadcast)
  .process(ValidateAndDistribute(initialConfigs))

val enriched: DataStream<EnrichedRecord> = AsyncDataStream.unorderedWait(
  validated,
  Enrich(),
  5L,
  TimeUnit.SECONDS
)
 




On Wed, Apr 7, 2021 at 11:28 PM Arvid Heise <[hidden email]> wrote:
Hi Alex,

your approach is completely valid. What you want to achieve is that you have a chain between your state managing operator and the consuming async operations. In that way, you have no serialization overhead.

To achieve that you want to
- use Flink 1.11+ [1]
- make sure that if you have a legacy source, you disableChaining before your state managing operator as asyncIO cannot be (transitively) chained to legacy sources. So it should be source -> ... -> (forward channel) -> (state managing operator -> async1 -> async2 -> ... ) ... -> sink
- enableObjectReuse [2] to avoid copying of objects


On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <[hidden email]> wrote:
Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :)

-0xe1a

On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Alex,

I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb) and our pipeline not huge (~1M records/day and 1GB data/ day), but this setup worked quite well. I'd say if latency isn't your most important metric, or if your pipeline is a similar size, the ease of async IO is worth it.

One thing you'll have to look out for (if you haven't already) is bootstrapping the config objects when the job starts, since the broadcast from the polling source can happen later than recieving the first record – we solved this by calling the polling source's service in the `open()` method of the non-async operator and storing the initial configs in memory.

Hope that helps a bit,
Austin

On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <[hidden email]> wrote:
Hi folks,

I have a somewhat complex Flink job that has a few async stages, and a few stateful stages. It currently loads its configuration on startup, and doesn't attempt to refresh it.

Now I'm working on dynamic reconfiguration. I've written a polling source which sends a configuration snapshot whenever anything has changed, I've set up a broadcast of that source, and I'm updating the operators in the data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've reached the first async operator, and I recall that async functions aren't allowed to be stateful.

I've tried to find a best practice for this situation, without much luck. My best idea so far is to insert a new stage before the async one, which would tuple up each record with its corresponding config snapshot from the most recent broadcast state. This would increase the amount of data that needs to be serialized, and some of the configs are quite large, but would allow me to continue using async IO.

Any suggestions?

Thanks!

-0xe1a