git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Go SDK: Teardown() not being called with dataflow runner


Thanks. It is much clearer now...

However, the code comments don't mention how often are {Start,Finish}Bundle called. What constitutes a batch?

If I am using a window of 1 minute, can I expect for {Start,Finish}Bundle every minute? In other words, will the window produce a batch of my data?

On 2018/07/03 20:31:56, Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote: 
> Hi Eduardo,
> Henning is right - the specific guarantees around Setup/Teardown vs.
> StartBundle/FinishBundle are currently described best in the Java SDK
> documentation:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
> (see
> documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
> For what you are doing, StartBundle/FinishBundle is 100% the proper
> abstraction - in the current code you are going to see data loss or
> corruption because the code is violating the fundamental guarantee that by
> the time FinishBundle returns, all work associated with the bundle must be
> completed. Pooling or batching mutations is in fact the primary use case
> for Start/FinishBundle. Setup/Teardown are for managing volatile resources
> like connections.
> 
> On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <herohde@xxxxxxxxxx> wrote:
> 
> > Teardown has very loose guarantees on when it's called and you essentially
> > can't rely on it. Currently, for Go on non-direct runners, we hang on to
> > the bundle descriptors forever and never destroy them (and in turn never
> > call Teardown). Even if we didn't, failures/restarts could cause Teardown
> > to not be called.
> >
> > If something _must_ happen, FinishBundle is the right method.
> >
> > Thanks,
> >  Henning
> >
> > On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@xxxxxxxxx <
> > eduardo.morales@xxxxxxxxx> wrote:
> >
> >> Essentially I have the following code:
> >>
> >> type Writer struct {
> >>   Pool WriterPool
> >> }
> >>
> >> func (w *Writer) Setup() {
> >>  w.Pool = Init()
> >> }
> >>
> >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> >>   w.Pool.Add(elem)
> >> }
> >>
> >> func (w* Writer) Teardown() {
> >>   w.Pool.Write()
> >>   w.Pool.Close()
> >> }
> >>
> >> beam.ParDo0(scope, &Writer{}, elemCollection)
> >>
> >> The above code runs fine with the direct runner but not with dataflow.
> >>
> >>  I added log lines to the above methods, and the ones in Teardown() never
> >> appear in the logs.
> >> If I change my code as follows:
> >>
> >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> >>   w.Pool.Add(elem)
> >>   w.Pool.Write()
> >> }
> >>
> >> Then I see the data being written, but I lose the ability to pool, plus I
> >> am leaking connections.
> >>
> >> Is this a known issue, or I am going something wrong?
> >>
> >> Thanks again for the help.
> >>
> >
>