git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Go SDK: Teardown() not being called with dataflow runner


Bundle boundaries are unspecified, dependent on the runner and the particular circumstances during this particular execution, and are generally unrelated to windowing or to the data contents itself. They have no semantic meaning - everything would still work exactly the same way even if every element was in its own bundle, or if the entire contents of a PCollection was in a single bundle. Bundle boundaries are only a way for the runner to communicate to you what are the allowed boundaries of amortizing work: basically, all you need to know for practical purposes is "if you do batching, flush it in finishBundle".

On Tue, Jul 3, 2018 at 2:29 PM eduardo.morales@xxxxxxxxx <eduardo.morales@xxxxxxxxx> wrote:
Thanks. It is much clearer now...

However, the code comments don't mention how often are {Start,Finish}Bundle called. What constitutes a batch?

If I am using a window of 1 minute, can I expect for {Start,Finish}Bundle every minute? In other words, will the window produce a batch of my data?

On 2018/07/03 20:31:56, Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
> Hi Eduardo,
> Henning is right - the specific guarantees around Setup/Teardown vs.
> StartBundle/FinishBundle are currently described best in the Java SDK
> documentation:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
> (see
> documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
> For what you are doing, StartBundle/FinishBundle is 100% the proper
> abstraction - in the current code you are going to see data loss or
> corruption because the code is violating the fundamental guarantee that by
> the time FinishBundle returns, all work associated with the bundle must be
> completed. Pooling or batching mutations is in fact the primary use case
> for Start/FinishBundle. Setup/Teardown are for managing volatile resources
> like connections.
>
> On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <herohde@xxxxxxxxxx> wrote:
>
> > Teardown has very loose guarantees on when it's called and you essentially
> > can't rely on it. Currently, for Go on non-direct runners, we hang on to
> > the bundle descriptors forever and never destroy them (and in turn never
> > call Teardown). Even if we didn't, failures/restarts could cause Teardown
> > to not be called.
> >
> > If something _must_ happen, FinishBundle is the right method.
> >
> > Thanks,
> >  Henning
> >
> > On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@xxxxxxxxx <
> > eduardo.morales@xxxxxxxxx> wrote:
> >
> >> Essentially I have the following code:
> >>
> >> type Writer struct {
> >>   Pool WriterPool
> >> }
> >>
> >> func (w *Writer) Setup() {
> >>  w.Pool = Init()
> >> }
> >>
> >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> >>   w.Pool.Add(elem)
> >> }
> >>
> >> func (w* Writer) Teardown() {
> >>   w.Pool.Write()
> >>   w.Pool.Close()
> >> }
> >>
> >> beam.ParDo0(scope, &Writer{}, elemCollection)
> >>
> >> The above code runs fine with the direct runner but not with dataflow.
> >>
> >>  I added log lines to the above methods, and the ones in Teardown() never
> >> appear in the logs.
> >> If I change my code as follows:
> >>
> >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> >>   w.Pool.Add(elem)
> >>   w.Pool.Write()
> >> }
> >>
> >> Then I see the data being written, but I lose the ability to pool, plus I
> >> am leaking connections.
> >>
> >> Is this a known issue, or I am going something wrong?
> >>
> >> Thanks again for the help.
> >>
> >
>