git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Rowtime for Table from DataStream without explixit fieldnames


Thanks for the answer Dawid and the helper function, Timo

it's not too bad for my use case (small number of fields), I just wanted to make sure I am not missing something here.

Cheers,

Johannes

On Thu, Oct 4, 2018 at 5:07 PM Timo Walther <twalthr@xxxxxxxxxx> wrote:
Hi Johannes,

this is not supported so far. You could write a little helper method like the following:

val s: Seq[_expression_] = Types.of[WC].asInstanceOf[CaseClassTypeInfo[WC]].fieldNames.map(Symbol(_).toExpr)

val s2: Seq[_expression_] = s :+ 'rowtime.rowtime

tEnv.fromDataSet(input, s2: _*)

Not a very nice solution, but it should work.

Regards,
Timo

Am 04.10.18 um 15:40 schrieb Dawid Wysakowicz:
Hi Johannes,

I am afraid that this is currently not possible and indeed you have to
pass all fields again, but Timo cced might want to correct me if I am wrong.

Best,

Dawid


On 04/10/18 15:08, Johannes Schulte wrote:
Hi,

when converting a DataStream (with Watermarks) to a table like
described here

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#event-time

I wonder on how to use the rowtime in a following window operation
_without_ explicitly specifying all field names and hence rely on case
class type inference.

Currently when operating on a stream of events 

case class Event(field1: String, ts: long)

val ds: DataStream[Event] = ...

I have to do 

tableEnv.fromDataStream(ds, 'field1, 'ts, 'myRowtime.rowtime) 

to do

.window(Tumble over 1.hours on 'myRowtime  as 'w)

afterwards. Is there a way to create the TimeAttribute column without
specifiying all fields again?

Thanks for yout help,

Johannes