Initializing mapstate hangs

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Initializing mapstate hangs

hassahma
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

vino yang
Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

hassahma
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

On 20 Oct 2018, at 02:55, vino yang <[hidden email]> wrote:

Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

vino yang
Hi Ahmad,

Can you try to dump thread info from the Task Manager's JVM instance?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月20日周六 下午4:24写道:
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

On 20 Oct 2018, at 02:55, vino yang <[hidden email]> wrote:

Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

hassahma

2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1) (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.

java.lang.NullPointerException: The state properties must not be null

at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)

at com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)

at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)





On Sat, 20 Oct 2018 at 11:29, vino yang <[hidden email]> wrote:
Hi Ahmad,

Can you try to dump thread info from the Task Manager's JVM instance?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月20日周六 下午4:24写道:
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

On 20 Oct 2018, at 02:55, vino yang <[hidden email]> wrote:

Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

Alexander Smirnov
I think that's because you declared it as transient field.

Move the declaration inside of "open" function to resolve that

On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <[hidden email]> wrote:

2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1) (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.

java.lang.NullPointerException: The state properties must not be null

at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)

at com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)

at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)





On Sat, 20 Oct 2018 at 11:29, vino yang <[hidden email]> wrote:
Hi Ahmad,

Can you try to dump thread info from the Task Manager's JVM instance?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月20日周六 下午4:24写道:
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

On 20 Oct 2018, at 02:55, vino yang <[hidden email]> wrote:

Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

Dawid Wysakowicz-2

Hi Ahmad,

I think Alexander is right. You've declared the state descriptor transient, which effectively makes it null at the worker node, when the state access is happening. Remove the transient modifier or instantiate the descriptor in the open method. The common pattern is to have the state itself as a transient field rather than the descriptor.

Best,

Dawid

On 22/10/2018 15:15, Alexander Smirnov wrote:
I think that's because you declared it as transient field.

Move the declaration inside of "open" function to resolve that

On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <[hidden email]> wrote:

2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1) (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.

java.lang.NullPointerException: The state properties must not be null

at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)

at com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)

at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)





On Sat, 20 Oct 2018 at 11:29, vino yang <[hidden email]> wrote:
Hi Ahmad,

Can you try to dump thread info from the Task Manager's JVM instance?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月20日周六 下午4:24写道:
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

On 20 Oct 2018, at 02:55, vino yang <[hidden email]> wrote:

Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Initializing mapstate hangs

hassahma
Thank you. That worked.

Regards,

On Tue, 23 Oct 2018 at 09:08, Dawid Wysakowicz <[hidden email]> wrote:

Hi Ahmad,

I think Alexander is right. You've declared the state descriptor transient, which effectively makes it null at the worker node, when the state access is happening. Remove the transient modifier or instantiate the descriptor in the open method. The common pattern is to have the state itself as a transient field rather than the descriptor.

Best,

Dawid

On 22/10/2018 15:15, Alexander Smirnov wrote:
I think that's because you declared it as transient field.

Move the declaration inside of "open" function to resolve that

On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <[hidden email]> wrote:

2018-10-22 13:46:31,944 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger, MetricWindowFunction) -> Map -> Sink: Unnamed (1/1) (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED.

java.lang.NullPointerException: The state properties must not be null

at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174)

at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168)

at com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219)

at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:745)





On Sat, 20 Oct 2018 at 11:29, vino yang <[hidden email]> wrote:
Hi Ahmad,

Can you try to dump thread info from the Task Manager's JVM instance?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月20日周六 下午4:24写道:
Flink 1.6.0. Valuestate initialises successful but mapstate hangs 

Regards 

On 20 Oct 2018, at 02:55, vino yang <[hidden email]> wrote:

Hi Ahmad,

Which version of Flink do you use?

Thanks, vino.

Ahmad Hassan <[hidden email]> 于2018年10月19日周五 下午11:32写道:
Hi,

Initializing mapstate hangs in window function. However if i use valuestate then it is initialized succcessfully. I am using rocksdb to store the state.

public class MyWindowFunction extends RichWindowFunction<Event, Payload, Tuple, TimeWindow>
{
private transient MapStateDescriptor<String, String> productsDescriptor = new MapStateDescriptor<>(
"mapState", String.class, String.class);

@Override
public void apply(Tuple key, TimeWindow window, final Iterable<Event> input,
final Collector<Payload> out)
{
// do something
}

@Override
public void open(Configuration parameters) throws Exception
{
System.out.println("## open init window state ");
MapState<String, String> state = this.getRuntimeContext().getMapState(productsDescriptor); <<< program hangs here
System.out.println("## open window state " + state);
}
}

Thanks for the help.