git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink table api


Hello fabian,

I have tried to convert table into stream as below


Cannot generate a valid execution plan for the given query:

tableEnv.toDataStream(result, Oplog.class);

and it is giving me below error.


LogicalFilter(condition=[<>($1, $3)])
  LogicalJoin(condition=[true], joinType=[inner])
    LogicalProject(master=[$1], timeStamp=[$5])
      LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISMaster')])
        LogicalTableScan(table=[[_DataStreamTable_0]])
    LogicalProject(child1=[$1], timeStamp2=[$5])
      LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISChild1')])
        LogicalTableScan(table=[[_DataStreamTable_0]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL
features.

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
amols@xxxxxxxxxxxxxxx


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
------------------------------------------------

On Mon, Jul 2, 2018 at 4:43 PM, Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx>
wrote:

> Hello Fabian,
>
> Can you please tell me hot to convert Table back into DataStream? I just
> want to print the table result.
>
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> amols@xxxxxxxxxxxxxxx
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> ------------------------------------------------
>
> On Mon, Jul 2, 2018 at 4:20 PM, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
>
>> You can also use Row, but then you cannot rely on automatic type
>> extraction
>> and provide TypeInformation.
>>
>> Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx> schrieb am Mo., 2. Juli
>> 2018,
>> 12:37:
>>
>> > Hello Fabian,
>> >
>> > According to my requirement I can not create static pojo's for all
>> classes
>> > because I want to create dynamic jobs for all tables based on rule
>> engine
>> > config. Please suggest me if there any other way to achieve this.
>> >
>> > -----------------------------------------------
>> > *Amol Suryawanshi*
>> > Java Developer
>> > amols@xxxxxxxxxxxxxxx
>> >
>> >
>> > *iProgrammer Solutions Pvt. Ltd.*
>> >
>> >
>> >
>> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>> 411016,
>> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
>> > ------------------------------------------------
>> >
>> > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <fhueske@xxxxxxxxx>
>> wrote:
>> >
>> > > Hi Amol,
>> > >
>> > > These are the requirements for POJOs [1] that are fully supported by
>> > Flink.
>> > >
>> > > Best, Fabian
>> > >
>> > > [1]
>> > > https://ci.apache.org/projects/flink/flink-docs-
>> > > release-1.5/dev/api_concepts.html#pojos
>> > >
>> > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <
>> amols@xxxxxxxxxxxxxxx>:
>> > >
>> > > > Hello Xingcan
>> > > >
>> > > > As mentioned in above mail thread I am streaming mongodb oplog to
>> join
>> > > > multiple mongo tables based on some unique key (Primary key). To
>> > achieve
>> > > > this I have created one java pojo as below. where o represent
>> generic
>> > > pojo
>> > > > type of mongodb which has my table fields i.e. dynamic. now I want
>> to
>> > use
>> > > > table api join over this basic BasicDBObject but it seem flink does
>> not
>> > > > allow generic pojo's. please suggest on this.
>> > > >
>> > > > public class Oplog {
>> > > >     private OplogTimestamp ts;
>> > > >     private BasicDBObject o;
>> > > > }
>> > > >
>> > > >
>> > > >
>> > > > -----------------------------------------------
>> > > > *Amol Suryawanshi*
>> > > > Java Developer
>> > > > amols@xxxxxxxxxxxxxxx
>> > > >
>> > > >
>> > > > *iProgrammer Solutions Pvt. Ltd.*
>> > > >
>> > > >
>> > > >
>> > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>> > > 411016,
>> > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> > > > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
>> > > > ------------------------------------------------
>> > > >
>> > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer <
>> > > > amols@xxxxxxxxxxxxxxx>
>> > > > wrote:
>> > > >
>> > > > > Hello Xingcan
>> > > > >
>> > > > > DataStream<Oplog> streamSource = env
>> > > > >         .addSource(kafkaConsumer)
>> > > > >         .setParallelism(4);
>> > > > >
>> > > > > StreamTableEnvironment tableEnv = TableEnvironment.
>> > > > getTableEnvironment(env);
>> > > > > // Convert the DataStream into a Table with default fields "f0",
>> "f1"
>> > > > > Table table1 = tableEnv.fromDataStream(streamSource);
>> > > > >
>> > > > >
>> > > > > Table customerMISMaster = table1.filter("ns ===
>> > > > 'local.customerMISMaster'"
>> > > > > ).select("o as master");
>> > > > > Table customerMISChild1 = table1.filter("ns ===
>> > > > 'local.customerMISChild1'"
>> > > > > ).select("o as child1");
>> > > > > Table customerMISChild2 = table1.filter("ns ===
>> > > > 'local.customerMISChild2'"
>> > > > > ).select("o as child2");
>> > > > > Table result = customerMISMaster.join(customerMISChild1).where("
>> > > master.
>> > > > > loanApplicationId=child1.loanApplicationId");
>> > > > >
>> > > > >
>> > > > > it is throwing error "Method threw 'org.apache.flink.table.api.
>> > > ValidationException'
>> > > > exception. Undefined function: LOANAPPLICATIONID"
>> > > > >
>> > > > >
>> > > > >
>> > > > > -----------------------------------------------
>> > > > > *Amol Suryawanshi*
>> > > > > Java Developer
>> > > > > amols@xxxxxxxxxxxxxxx
>> > > > >
>> > > > >
>> > > > > *iProgrammer Solutions Pvt. Ltd.*
>> > > > >
>> > > > >
>> > > > >
>> > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune
>> -
>> > > > 411016,
>> > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> > > > > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
>> > > > > ------------------------------------------------
>> > > > >
>> > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingcanc@xxxxxxxxx>
>> > > wrote:
>> > > > >
>> > > > >> Hi Amol,
>> > > > >>
>> > > > >> The “dynamic table” is just a logical concept, following which
>> the
>> > > Flink
>> > > > >> table API is designed.
>> > > > >> That means you don’t need to implement dynamic tables yourself.
>> > > > >>
>> > > > >> Flink table API provides different kinds of stream to stream
>> joins
>> > in
>> > > > >> recent versions (from 1.4).
>> > > > >> The related docs can be found here
>> https://ci.apache.org/projects
>> > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins <
>> > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5
>> > > > >> /dev/table/tableApi.html#joins>.
>> > > > >>
>> > > > >> Best,
>> > > > >> Xingcan
>> > > > >>
>> > > > >>
>> > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <
>> > > > amols@xxxxxxxxxxxxxxx>
>> > > > >> wrote:
>> > > > >> >
>> > > > >> > Hello,
>> > > > >> >
>> > > > >> > I am streaming mongodb oplog using kafka and flink and want to
>> > join
>> > > > >> > multiple tables using flink table api but i have some concerns
>> > like
>> > > is
>> > > > >> it
>> > > > >> > possible to join streamed tables in flink and if yes then
>> please
>> > > > >> provide me
>> > > > >> > some example of stream join using table API.
>> > > > >> >
>> > > > >> > I gone through your dynamic table api doc. it is quit
>> interesting
>> > > but
>> > > > >> > haven't found any example tutorial how to implement dynamic
>> table.
>> > > > >> >
>> > > > >> > I have tried to implement table api join using pojo class but
>> it
>> > is
>> > > > >> > giving org.apache.flink.table.api.TableException: Cannot
>> generate
>> > a
>> > > > >> valid
>> > > > >> > execution plan for the given query
>> > > > >> >
>> > > > >> > -----------------------------------------------
>> > > > >> > *Amol Suryawanshi*
>> > > > >> > Java Developer
>> > > > >> > amols@xxxxxxxxxxxxxxx
>> > > > >> >
>> > > > >> >
>> > > > >> > *iProgrammer Solutions Pvt. Ltd.*
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing
>> Society,
>> > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road,
>> Pune
>> > -
>> > > > >> 411016,
>> > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> > > > >> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
>> > > > >> > ------------------------------------------------
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>
>