git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink table api


Hi Amol,

These are the requirements for POJOs [1] that are fully supported by Flink.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/api_concepts.html#pojos

2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx>:

> Hello Xingcan
>
> As mentioned in above mail thread I am streaming mongodb oplog to join
> multiple mongo tables based on some unique key (Primary key). To achieve
> this I have created one java pojo as below. where o represent generic pojo
> type of mongodb which has my table fields i.e. dynamic. now I want to use
> table api join over this basic BasicDBObject but it seem flink does not
> allow generic pojo's. please suggest on this.
>
> public class Oplog {
>     private OplogTimestamp ts;
>     private BasicDBObject o;
> }
>
>
>
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> amols@xxxxxxxxxxxxxxx
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> ------------------------------------------------
>
> On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer <
> amols@xxxxxxxxxxxxxxx>
> wrote:
>
> > Hello Xingcan
> >
> > DataStream<Oplog> streamSource = env
> >         .addSource(kafkaConsumer)
> >         .setParallelism(4);
> >
> > StreamTableEnvironment tableEnv = TableEnvironment.
> getTableEnvironment(env);
> > // Convert the DataStream into a Table with default fields "f0", "f1"
> > Table table1 = tableEnv.fromDataStream(streamSource);
> >
> >
> > Table customerMISMaster = table1.filter("ns ===
> 'local.customerMISMaster'"
> > ).select("o as master");
> > Table customerMISChild1 = table1.filter("ns ===
> 'local.customerMISChild1'"
> > ).select("o as child1");
> > Table customerMISChild2 = table1.filter("ns ===
> 'local.customerMISChild2'"
> > ).select("o as child2");
> > Table result = customerMISMaster.join(customerMISChild1).where("master.
> > loanApplicationId=child1.loanApplicationId");
> >
> >
> > it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException'
> exception. Undefined function: LOANAPPLICATIONID"
> >
> >
> >
> > -----------------------------------------------
> > *Amol Suryawanshi*
> > Java Developer
> > amols@xxxxxxxxxxxxxxx
> >
> >
> > *iProgrammer Solutions Pvt. Ltd.*
> >
> >
> >
> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> 411016,
> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> > ------------------------------------------------
> >
> > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingcanc@xxxxxxxxx> wrote:
> >
> >> Hi Amol,
> >>
> >> The “dynamic table” is just a logical concept, following which the Flink
> >> table API is designed.
> >> That means you don’t need to implement dynamic tables yourself.
> >>
> >> Flink table API provides different kinds of stream to stream joins in
> >> recent versions (from 1.4).
> >> The related docs can be found here https://ci.apache.org/projects
> >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins <
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.5
> >> /dev/table/tableApi.html#joins>.
> >>
> >> Best,
> >> Xingcan
> >>
> >>
> >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <
> amols@xxxxxxxxxxxxxxx>
> >> wrote:
> >> >
> >> > Hello,
> >> >
> >> > I am streaming mongodb oplog using kafka and flink and want to join
> >> > multiple tables using flink table api but i have some concerns like is
> >> it
> >> > possible to join streamed tables in flink and if yes then please
> >> provide me
> >> > some example of stream join using table API.
> >> >
> >> > I gone through your dynamic table api doc. it is quit interesting but
> >> > haven't found any example tutorial how to implement dynamic table.
> >> >
> >> > I have tried to implement table api join using pojo class but it is
> >> > giving org.apache.flink.table.api.TableException: Cannot generate a
> >> valid
> >> > execution plan for the given query
> >> >
> >> > -----------------------------------------------
> >> > *Amol Suryawanshi*
> >> > Java Developer
> >> > amols@xxxxxxxxxxxxxxx
> >> >
> >> >
> >> > *iProgrammer Solutions Pvt. Ltd.*
> >> >
> >> >
> >> >
> >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> >> 411016,
> >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> >> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> >> > ------------------------------------------------
> >>
> >>
> >
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-development/msg07896.html on line 222
Call Stack
#TimeMemoryFunctionLocation
10.0009368520{main}( ).../msg07896.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-development/msg07896.html on line 222
Call Stack
#TimeMemoryFunctionLocation
10.0009368520{main}( ).../msg07896.html:0