slot group indication per operator

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

slot group indication per operator

Sofer, Tovi

Hi all,

I am trying to use the slot group feature, by having ‘default’ group and additional ‘market’ group.

The purpose is to divide the resources equally between two sources and their following operators.

I’ve set the slotGroup on the source of the market data.

Can I assume that all following operators created from this source will use same slot group of ‘market’?

(The operators created for market stream are pretty complex, with connect and split).

In Web UI I saw there are 16 slots, but didn’t see indication per operator to which group it was assigned. How can I know?

Relevant Code:
env.setParallelism(8);

conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow Parallelism of 8 per group

 

// Market source and operators:

KeyedStream<SpotTickEvent, Tuple> windowedStreamA = sourceProvider.provide(env)
        .name(
spotSourceProvider.getName())
        .slotSharingGroup(SourceMsgType.
MARKET.slotGroup())
        .flatMap(
new ParserMapper(new MarketMessageParser()))
        .name(ParserMapper.
class.getSimpleName())
        .filter(
new USDFilter())
        .name(USDFilter.
class.getSimpleName())
        .keyBy(MarketEvent.
CURRENCY_FIELD)
        .timeWindow(Time.of(windowSizeMs, TimeUnit.
MILLISECONDS))
        .process(
new LastInWindowPriceChangeFunction()))
        .name(LastInWindowPriceChangeFunction.
class.getSimpleName())
        .keyBy(SpotTickEvent.
CURRENCY_FIELD);

 

marketConnectedStream = windowedStreamA.connect(windowedStreamB)
            .flatMap(
new MarketCoMapper()))
            .name(MarketCoMapper.
class.getSimpleName())
 
SplitStream<MarketAWithMarketB> stocksWithSpotsStreams = marketConnectedStream
        .split( market -> ImmutableList.of(
"splitA"," splitB") );

DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select(
"splitA ");

 

 

Thanks and regards,

Tovi

 

 

Reply | Threaded
Open this post in threaded view
|

Re: slot group indication per operator

Timo Walther
Hi Tovi,

you are right, it is difficult to check the correct behavior.

@Chesnay: Do you know if we can get this information? If not through the Web UI, maybe via REST? Do we have access to the full ExecutionGraph somewhere?

Otherwise it might make sense to open an issue for this.

Regards,
Timo


Am 12/5/17 um 4:25 PM schrieb Sofer, Tovi :

Hi all,

I am trying to use the slot group feature, by having ‘default’ group and additional ‘market’ group.

The purpose is to divide the resources equally between two sources and their following operators.

I’ve set the slotGroup on the source of the market data.

Can I assume that all following operators created from this source will use same slot group of ‘market’?

(The operators created for market stream are pretty complex, with connect and split).

In Web UI I saw there are 16 slots, but didn’t see indication per operator to which group it was assigned. How can I know?

Relevant Code:
env.setParallelism(8);

conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow Parallelism of 8 per group

 

// Market source and operators:

KeyedStream<SpotTickEvent, Tuple> windowedStreamA = sourceProvider.provide(env)
        .name(spotSourceProvider.getName())
        .slotSharingGroup(SourceMsgType.MARKET.slotGroup())
        .flatMap(new ParserMapper(new MarketMessageParser()))
        .name(ParserMapper.class.getSimpleName())
        .filter(new USDFilter())
        .name(USDFilter.class.getSimpleName())
        .keyBy(MarketEvent.CURRENCY_FIELD)
        .timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS))
        .process(new LastInWindowPriceChangeFunction()))
        .name(LastInWindowPriceChangeFunction.class.getSimpleName())
        .keyBy(SpotTickEvent.CURRENCY_FIELD);

 

marketConnectedStream = windowedStreamA.connect(windowedStreamB)
            .flatMap(new MarketCoMapper()))
            .name(MarketCoMapper.class.getSimpleName())
 
SplitStream<MarketAWithMarketB> stocksWithSpotsStreams = marketConnectedStream
        .split( market -> ImmutableList.of("splitA"," splitB") );

DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA ");

 

 

Thanks and regards,

Tovi

 

 


Reply | Threaded
Open this post in threaded view
|

RE: slot group indication per operator

Sofer, Tovi

Hi.

 

Any update or suggestion on this?

 

Best regards,

Tovi

From: Timo Walther [mailto:[hidden email]]
Sent:
יום ג 05 דצמבר 2017 18:55
To: [hidden email]
Cc: [hidden email]
Subject: Re: slot group indication per operator

 

Hi Tovi,

you are right, it is difficult to check the correct behavior.

@Chesnay: Do you know if we can get this information? If not through the Web UI, maybe via REST? Do we have access to the full ExecutionGraph somewhere?

Otherwise it might make sense to open an issue for this.

Regards,
Timo


Am 12/5/17 um 4:25 PM schrieb Sofer, Tovi :

Hi all,

I am trying to use the slot group feature, by having ‘default’ group and additional ‘market’ group.

The purpose is to divide the resources equally between two sources and their following operators.

I’ve set the slotGroup on the source of the market data.

Can I assume that all following operators created from this source will use same slot group of ‘market’?

(The operators created for market stream are pretty complex, with connect and split).

In Web UI I saw there are 16 slots, but didn’t see indication per operator to which group it was assigned. How can I know?

Relevant Code:
env.setParallelism(8);

conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow Parallelism of 8 per group

 

// Market source and operators:

KeyedStream<SpotTickEvent, Tuple> windowedStreamA = sourceProvider.provide(env)
        .name(spotSourceProvider.getName())
        .slotSharingGroup(SourceMsgType.MARKET.slotGroup())
        .flatMap(new ParserMapper(new MarketMessageParser()))
        .name(ParserMapper.class.getSimpleName())
        .filter(new USDFilter())
        .name(USDFilter.class.getSimpleName())
        .keyBy(MarketEvent.CURRENCY_FIELD)
        .timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS))
        .process(new LastInWindowPriceChangeFunction()))
        .name(LastInWindowPriceChangeFunction.class.getSimpleName())
        .keyBy(SpotTickEvent.CURRENCY_FIELD);

 

marketConnectedStream = windowedStreamA.connect(windowedStreamB)
            .flatMap(new MarketCoMapper()))
            .name(MarketCoMapper.class.getSimpleName())
 
SplitStream<MarketAWithMarketB> stocksWithSpotsStreams = marketConnectedStream
        .split( market -> ImmutableList.of("splitA"," splitB") );
 
DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA ");

 

 

Thanks and regards,

Tovi