I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports
also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions
from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose
information about offsets in Kafka.
I also cc @Gordon, who might want to add something to this.
On 12/09/18 18:03, Juan Gentile wrote:
Description: OpenPGP digital signature