Hi Shahar,The table function takes a single row but can output multi rows. You can split the row based on the "last" event. The code looks like:val sessionResult =
"SELECT " +
" lastUDAF(line) AS lastEvents "
"FROM MyTable " +
"GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
val result =
s"SELECT lastEvent FROM ($sessionResult), LATERAL TABLE(splitUDTF(lastEvents)) as T(lastEvent)"The lastUDAF is used to process data in a session window. As your lastEvent is base on either window end or a special "last" event, the lastUDAF outputs multi last events.After the window, we perform a splitUDTF to split the lastEvents to multi single events.Best, HequnOn Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <shahar.kobrinsky@xxxxxxxxx> wrote:Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)?How would the SQL look like?On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <chenghequn@xxxxxxxxx> wrote:Hi shkob1,> while one is time(session inactivity) the other is based on a specific event marked as a "last" event.How about using a session window and an udtf to solve the problem. The session window may output multi `last` elements. However, we can use a udtf to split them into single ones. Thus, we can use SQL for the whole job.Best, Hequn.On Sat, Oct 13, 2018 at 2:28 AM shkob1 <shahar.kobrinsky@xxxxxxxxx> wrote:Hey!
I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it.
It looks something like this:
Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
.filter(tuple -> tuple.f0)
.process(...take last element from the group by result..)
This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?
basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/