git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Problem with SparkSubmit


No excuse necessary, we are glad you found the issue!

Best,
Arthur

On Tue, May 8, 2018 at 8:06 AM Anton Mushin <Anton_Mushin@xxxxxxxx> wrote:

> I found mistake in my connection configuration.
> The problem is not relevant. Excuse me for troubling.
>
> Best Regards,
> Anton
>
> -----Original Message-----
> From: Anton Mushin <Anton_Mushin@xxxxxxxx>
> Sent: Tuesday, May 08, 2018 6:47 PM
> To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
> Subject: RE: Problem with SparkSubmit
>
> Hi, Fokko
> Thanks for you help.
>
> I got new error:
> ERROR - [Errno 2] No such file or directory: 'spark-submit'
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line
> 1493, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File
> "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py",
> line 145, in execute
>     self._hook.submit(self._application)
>   File
> "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py",
> line 231, in submit
>     **kwargs)
>   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
>     restore_signals, start_new_session)
>   File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child
>     raise child_exception_type(errno_num, err_msg)
>
> In my case spark-submit isn't added to PATH and I can't do it. I can't
> find information how I should configure spark submit operator for this case.
> Could you help me? Should I set some os.environ for define path to
> spark-submit script?
>
> Best Regards,
> Anton
>
>
> -----Original Message-----
> From: fokko@xxxxxxxxxxxxxxx <fokko@xxxxxxxxxxxxxxx> On Behalf Of
> Driesprong, Fokko
> Sent: Monday, May 07, 2018 9:21 PM
> To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> I see the issue now. You're passing the to the jars argument. But this is
> actually for additional jars that need to be passed on to the spark
> classpath. For example additional jars that provide UDF's. You need to pass
> the jar to the application argument.
>
> _config = {
>     'application': 'spark_job.jar',
>     'executor_memory': '2g',
>     'name': 'myJob',
>     'conn_id': connection_id,
>     'java_class':'org.Job'
> }
>
> operator = SparkSubmitOperator(
>     task_id='myTask',
>     dag=dag,
>     **_config
> )
>
> Hope this helps.
>
> Cheers, Fokko
>
> 2018-05-07 6:54 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:
>
> > Hi,  Fokko
> > Thanks for your reply.
> >
> > I use version 1.9.0
> >
> > -----Original Message-----
> > From: fokko@xxxxxxxxxxxxxxx <fokko@xxxxxxxxxxxxxxx> On Behalf Of
> > Driesprong, Fokko
> > Sent: Saturday, April 28, 2018 10:54 PM
> > To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
> > Subject: Re: Problem with SparkSubmit
> >
> > Hi Anton,
> >
> > Which version of Airflow are you running?
> >
> > Cheers, Fokko
> >
> > 2018-04-27 10:24 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:
> >
> > > Hi all,
> > > I have problem with spark operator. I get exception
> > >
> > > user@host:/# airflow test myDAG myTask 2018-04-26
> > > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar
> > > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar
> > > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor
> > > SequentialExecutor
> > > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the
> > > DagBag from /usr/local/airflow/dags
> > > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > > sparkhost
> > > Traceback (most recent call last):
> > >   File "/usr/local/bin/airflow", line 27, in <module>
> > >     args.func(args)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > > line 528, in test
> > >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > > line 50, in wrapper
> > >     result = func(*args, **kwargs)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > > line 1584, in run
> > >     session=session)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > > line 50, in wrapper
> > >     result = func(*args, **kwargs)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > > line 1493, in _run_raw_task
> > >     result = task_copy.execute(context=context)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > > operators/spark_submit_operator.py", line 145, in execute
> > >     self._hook.submit(self._application)
> > >   File
> > > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_
> > > su
> > > bmit_hook.py",
> > > line 231, in submit
> > >     **kwargs)
> > >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> > >     restore_signals, start_new_session)
> > >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> > >     restore_signals, start_new_session, preexec_fn)
> > > TypeError: Can't convert 'list' object to str implicitly
> > >
> > > My DAG look like:
> > >
> > > from airflow import DAG
> > > from datetime import datetime, timedelta, date from
> > > airflow.contrib.operators.spark_submit_operator import
> > > SparkSubmitOperator
> > >
> > > default_args = {
> > >     'owner': 'spark',
> > >     'depends_on_past': False,
> > >     'start_date': datetime.now(),
> > >     'retries': 1,
> > >     'retry_delay': timedelta(minutes=1) }
> > >
> > > dag = DAG('myDAG', default_args=default_args,)
> > >
> > > connection_id = "SPARK"
> > > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> > 'spark://sparkhost:7077'
> > >
> > > _config = {
> > >     'jars': 'spark_job.jar',
> > >     'executor_memory': '2g',
> > >     'name': 'myJob',
> > >     'conn_id': connection_id,
> > >     'java_class':'org.Job'
> > > }
> > >
> > > operator = SparkSubmitOperator(
> > >     task_id='myTask',
> > >     dag=dag,
> > >     **_config
> > > )
> > >
> > > What is wrong? Could somebody help me wit it?
> > >
> > >
> >
>