[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Conceptual question

Hi Piotrek,

It seems that this was implemented by `Operator` API, which is a more low level api compared to `Function` API.
Since in `Function` API level we can only migrate state by event triggered, it is more convenient in this way to migrate state by foreach all keys in `open()` method.
If I was implemented state operator by `ProcessFunction` API, is it possible to port it to `KeyedProcessOperator` and do the state migration that you mentioned?
And are there something concerned and difficulties that will leads to restored state failed or other problems? Thank you!

Best Regards,
Tony Wei

2018-06-07 16:10 GMT+08:00 Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx>:

General solution for state/schema migration is under development and it might be released with Flink 1.6.0.

Before that, you need to manually handle the state migration in your operator’s open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible with previous version. What you can do, is to add a logic in open method, to check:
1. If “stateV2” is non empty, do nothing
2. If there is no “stateV2”, iterate over all of the keys and manually migrate “stateV1” to “stateV2”

In your OperatorV3 you could drop the support for “stateV1”.

I have once implemented something like that here:

Hope that helps!


On 6 Jun 2018, at 17:04, TechnoMage <mlatta@xxxxxxxxxxxxxx> wrote:

We are still pretty new to Flink and I have a conceptual / DevOps question.

When a job is modified and we want to deploy the new version, what is the preferred method?  Our jobs have a lot of keyed state.

If we use snapshots we have old state that may no longer apply to the new pipeline.
If we start a new job we can reprocess historical data from Kafka, but that can be very resource heavy for a while.

Is there an option I am missing?  Are there facilities to “patch” or “purge” selectively the keyed state?


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg09728.html on line 69
Call Stack
10.0007368648{main}( ).../msg09728.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg09728.html on line 69
Call Stack
10.0007368648{main}( ).../msg09728.html:0