git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Large rocksdb state restore/checkpoint duration behavior


Hi,

We are using Flink 1.6.1 on yarn with rocksdb as backend incrementally
checkpointed to hdfs (for data and timers).

The job reads events from kafka (~1 billion event per day), constructs user
sessions using an EventTimeSessionWindow coupled with a late firing trigger
and WindowFunction with AggregatingState (few minutes gap, 1 day allowed
lateness, ~1TB state ) to produces results back into kafka (~200 millions
event per day).

When trying to restart the job for maintenance (stopped the cluster for 1
hour), the restore duration took several hours.

Task metrics showed that no new data was read from Kafka, but the job
produced data out.

Also, sometimes, the job seems to freeze (no data in/out) while performing
long checkpoints (~20 minutes)

When we try to cancel the job it takes several minutes before stopping and
logs show the following :
:
2018-10-09 11:53:53,348 WARN  org.apache.flink.runtime.taskmanager.Task                    
- Task 'window-function -> (Sink: kafka-sink, Sink: kafka-late-sink) (1/60)'
did not react to cancelling signal for 30 seconds, but is stuck in method:
 org.rocksdb.RocksDB.get(Native Method)
org.rocksdb.RocksDB.get(RocksDB.java:810)
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
java.lang.Thread.run(Thread.java:745)

Any ideas on this ?

Regards,
Amine




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/