flink custom stream source

Hi ,

I am creating a new custom source for reading some streaming data which has different streams. So I assign streams to each task slots and then read it. This works fine but in some cases I have less streams than task slots and in that case some of workers are not assigned any streams and these still calls the snapshotState method. I want a way to say that if there is no stream assigned then simply return.

It works but then the snapshot state is not even called on other task slots as well. If i let it be like this then I can see that things works fine. However, I can see lots of calls to snapshot state where it doesnt do anything.

So how do I sort of return/stop a task slot without impacting anything.

Just another issue I observed that sometimes snapshot state is not called for long time even though my checkpoint interval is 1 second.