you for the answer. In our case we did change the name of
the Kafka source so we expected it shouldn’t restore state
for a given Kafka source operator.
shouldn’t FlinkKafkaConsumerBase have a safeguard which do
not allow restoring of the KafkaTopicPartitions from the
topics which are different from the currently consumed one.
I think this is somehow expected behaviour. Flink, in order
to provide proper processing semantics keeps track of
partitions offsets internally, and checkpoints those offsets.
also new partitions discovery. Having in mind both of those
features, if you restart your job with savepoint/checkpoint
but with changed topic, it will restore old partitions with
offsets from checkpoint, and will discover partitions
from the new topic. This is why it consumes from both old and
new topic. If you defined your source manually (you were not
using Kafka010TableSource) what you can do is set new uid for
the source and enable allowNonRestoredState. This way you will
keep state for all other operators, but you will lose
information about offsets in Kafka.
I also cc @Gordon, who might want to add something to this.
On 12/09/18 18:03, Juan Gentile wrote:
We have found a weird issue while replacing
the source in one of our Flink SQL Jobs.
We have a job which was reading from a Kafka
topic (with externalize checkpoints) and we needed to
change the topic while keeping the same logic for the
After we restarted the job, instead of
consuming from the new Kafka topic, it consumed from both!
Duplicating the input of our job.
We were able to reproduce the issue but we
don’t understand if this is a bug or expected behavior and
in this case we should have restarted from a clean state.
We are using Flink 1.4 at the moment and