git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Go SDK: Teardown() not being called with dataflow runner


Hi Eduardo,
Henning is right - the specific guarantees around Setup/Teardown vs. StartBundle/FinishBundle are currently described best in the Java SDK documentation: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java (see documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
For what you are doing, StartBundle/FinishBundle is 100% the proper abstraction - in the current code you are going to see data loss or corruption because the code is violating the fundamental guarantee that by the time FinishBundle returns, all work associated with the bundle must be completed. Pooling or batching mutations is in fact the primary use case for Start/FinishBundle. Setup/Teardown are for managing volatile resources like connections.

On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <herohde@xxxxxxxxxx> wrote:
Teardown has very loose guarantees on when it's called and you essentially can't rely on it. Currently, for Go on non-direct runners, we hang on to the bundle descriptors forever and never destroy them (and in turn never call Teardown). Even if we didn't, failures/restarts could cause Teardown to not be called.

If something _must_ happen, FinishBundle is the right method.

Thanks,
 Henning

On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@xxxxxxxxx <eduardo.morales@xxxxxxxxx> wrote:
Essentially I have the following code:

type Writer struct {
  Pool WriterPool
}

func (w *Writer) Setup() {
 w.Pool = Init()
}

func (w* Writer) ProcessElement(ctx, elem Elem) {
  w.Pool.Add(elem)
}

func (w* Writer) Teardown() {
  w.Pool.Write()
  w.Pool.Close()
}

beam.ParDo0(scope, &Writer{}, elemCollection)

The above code runs fine with the direct runner but not with dataflow.

 I added log lines to the above methods, and the ones in Teardown() never appear in the logs.
If I change my code as follows:

func (w* Writer) ProcessElement(ctx, elem Elem) {
  w.Pool.Add(elem)
  w.Pool.Write()
}

Then I see the data being written, but I lose the ability to pool, plus I am leaking connections.

Is this a known issue, or I am going something wrong?

Thanks again for the help.