git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Submit WordCount to a Flink cluster


Alice,

Totally agree with what Lukasz said. 
Also, as alternative solution for job testing, I can suggest to install Flink locally and run Beam pipeline with just CLI command like this “bin/flink run -c <main_class> /path/to/jar --runner=FlinkRunner “.

On 18 Jul 2018, at 17:09, Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:

Yes, you should be able to submit jobs to a Flink master from anywhere you have network connectivity to the Flink master.

It looks like your job is being submitted to the Flink master and we start waiting for the job to complete but something is causing the job to not complete successfully. Have you tried looking at the Flink master UI or Flink master logs?



On Fri, Jul 13, 2018 at 6:45 PM Alice Wong <airwaywong@xxxxxxxxx> wrote:
Hello,

I am a newbie to Beam.

Following the Beam docs, I am trying to submit the example WordCount to a Flink cluster (one jobmanger and one taskmanager running locally in two linked Docker containers with Maven installed).

It seems the Beam doc is a bit confusing as to how to submit jobs.


mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
where <flink master> seems just a hostname.


$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam--bundled-0.1.jar"
where I can give localhost:8081 for flinkMaster.

I have tried run this command both outside Docker and in the jobmanager container (with exec command). Either way, if I use "localhost" without port for <flink master url>, it just runs locally and ignores flink cluster. If I use "localhost:8081", the command hangs for about 5 seconds and shows the following error messages. It eventually disconnects and dies.

Could you help give some hint how the Beam jobs are submitted to Flink cluster in general? Can I do it outside jobmanager node remotely?

Thanks in advance!

--------------------------------------
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Executing pipeline using FlinkRunner.
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Translating pipeline to Flink program.
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment
INFO: Creating a Batch Execution Environment.
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
INFO:  enterCompositeTransform-
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
INFO: |    enterCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator visitPrimitiveTransform
INFO: |   |    visitPrimitiveTransform- ReadLines/Read
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
INFO: |    leaveCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
INFO: |    enterCompositeTransform- WordCount.CountWords
...
INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
INFO: |    leaveCompositeTransform- WriteCounts
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
INFO:  leaveCompositeTransform-
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Starting execution of Flink program.
Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment createProgramPlan
INFO: The job has 0 registered types and 0 default Kryo serializers
Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern pom.xml matched 1 files with total size 10600
Jul 14, 2018 12:49:49 AM org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
INFO: Starting client actor system.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
INFO: Trying to select the network interface and address to use by connecting to the leading JobManager.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
INFO: TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener findConnectingAddress
INFO: Retrieved new target address localhost/127.0.0.1:8081.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Trying to start actor system at c4342d15c3f4:0
Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 applyOrElse
INFO: Slf4jLogger started
Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Starting remoting
Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Remoting started; listening on addresses :[akka.tcp://flink@c4342d15c3f4:46627]
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient logAndSysout
INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion.
Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion.
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor handleMessage
INFO: Received SubmitJobAndWait(JobGraph(jobId: 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a JobManager yet.
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
INFO: Received job wordcount-root-0714004948-3ee44b3d (87a12b5471d39a7837d6b0def6d748e2).
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor disconnectFromJobManager
INFO: Disconnect from JobManager null.
Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1213486164 - discarded
Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
WARNING: Association with remote system [akka.tcp://flink@localhost:8081] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system explicitly disassociated (reason unknown).]
...
Jul 14, 2018 12:50:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Remoting shut down.
Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
SEVERE: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
...