git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Submitting job with savepoint through StreamExecutionEnvironment


I agree with your take regarding superficial stream environment distinction
and the difficulties that introduces for users.

To fix the immediate issue in Beam, it was necessary to duplicate
RemoteStreamEnvironment.executeRemotely

https://github.com/apache/beam/pull/7169/files#diff-6acb0479d563cfc121ac04e789f4bc6dR294

To address this in Flink, it would make sense to turn that piece of code
into a utility method that can be used directly and from
RemoteStreamEnvironment.execute for compatibility.

If there isn't any other feedback then I would create a JIRA and work on
this.

Thanks,
Thomas

On Thu, Nov 29, 2018 at 10:00 AM Chesnay Schepler <chesnay@xxxxxxxxxx>
wrote:

> I'm only voicing my opinion here; these do not reflect in any way
> long-term directions.
>
> I wouldn't remove the execute() method; it's too important for a
> convenient execution of jobs via the CLI/WebUI.
>
> But I would like to get rid of this distinction of environments as their
> existence implies that there is not one way to /write/ a Flink job as
> they can differ in the environment they use, which affects whether you
> can even run them via the CLI.
>
> Have you ever tried setting up a jar that you can both run via the CLI
> on the associated cluster, but also in the IDE on said cluster? You
> would need 2 entry points, which create the environment and pass this
> environment to your job-defining method. This is quite different from a
> normal job, where the environment is created right before you define
> your job.
> Yet at the same time you can write a job that simply uses a
> StreamExecutionEnvironment, that can run locally in the IDE, or can be
> submitted to the CLI.
> This just seems highly inconsistent to me.
>
> There are some ways how one could deal with this; for example in our
> tests we basically inject a job executor into the default
> StreamExecutionEnvironment. One could also resort to a approach that
> uses system properties to determine how they should be executed.
> However, I haven't thought about these thoroughly.
>
> Regardless if you were to use the RestClusterClient explicitly,
> then yes, currently you would access rather obscure semi-internal code.
> (that is very much subject to change)
>
> But encapsulating this into a Execution.executeRemotely(env, host, port,
> savepointRestoreSettings) method (as a replacement for execute()) would
> be feasible imo.
>
> On 29.11.2018 17:00, Thomas Weise wrote:
> > Thanks for taking a look.
> >
> > Are you saying that the longer term direction is to get rid of the
> > execute method from StreamExecutionEnvironment and instead construct
> > the cluster client outside?
> >
> > That would currently expose even more internals to the user.
> > Considering the current implementation in RemoteStreamEnvironment:
> >
> > @Override
> > public JobExecutionResult execute(String jobName) throws
> > ProgramInvocationException {
> > StreamGraph streamGraph = getStreamGraph();
> > streamGraph.setJobName(jobName);
> > transformations.clear();
> > return executeRemotely(streamGraph, jarFiles);
> > }
> >
> > We would use
> > env.getStreamGraph().getJobGraph().setSavepointRestoreSettings(..) in
> > the Beam code and then use the cluster client directly.
> >
> > If we wanted to keep this hidden from users, we could add
> > setSavePointRestoreSettings to RemoteStreamEnvironment and
> > LocalStreamEnvironment and deal with it internally.
> >
> > Alternatively, the remote environment could serve just as cluster
> > client factory.
> >
> > WDYT?
> >
> >
> > On Thu, Nov 29, 2018 at 2:35 AM Chesnay Schepler <chesnay@xxxxxxxxxx
> > <mailto:chesnay@xxxxxxxxxx>> wrote:
> >
> >     I'm not aware of any plans to expose this in the
> >     StreamExecutionEnvironment.
> >
> >     The issue would be that we would start mixing submission details
> >     with the job definition, which results in redundancy and weird
> >     semantics, e.g., which savepoint configuration takes priority if
> >     both a job and CLI job submission specify it?
> >
> >     Looking at the Beam JIRA it would be sufficient to have this in
> >     the RemoteStreamEnvironment (which would be /less /problematic
> >     since the issue above is baked into this class anyway), however I
> >     would recommend migrating to a ClusterClient for these use-cases.
> >
> >     On 29.11.2018 08:18, Thomas Weise wrote:
> >>     Hi,
> >>
> >>     Currently it is not possible to submit a job with savepoint restore
> option
> >>     through the execution environment. I found that while attempting to
> add the
> >>     support to the Flink runner in Beam (
> >>     https://issues.apache.org/jira/browse/BEAM-5396)
> >>
> >>     I also foundhttps://issues.apache.org/jira/browse/FLINK-9644  -
> but is
> >>     there a plan to support restore from savepoint
> >>     through StreamExecutionEnvironment in general?
> >>
> >>     Thanks,
> >>     Thomas
> >>
> >
>
>