git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Missing MapState when Timer fires after restored state


Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.

- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)

And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?

Best, Sihua




On 05/18/2018 17:14Juho Autio<juho.autio@xxxxxxxxx> wrote:

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <juho.autio@xxxxxxxxx> wrote:
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <summerleafs@xxxxxxx> wrote:
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.

Best, Sihua



On 05/18/2018 15:02Juho Autio<juho.autio@xxxxxxxxx> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.richter@xxxxxxxxxxxxxxxxx> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan






( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg09352.html on line 143
Call Stack
#TimeMemoryFunctionLocation
10.0009376896{main}( ).../msg09352.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg09352.html on line 143
Call Stack
#TimeMemoryFunctionLocation
10.0009376896{main}( ).../msg09352.html:0