[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

JDBCInputFormat and SplitDataProperties

Hi everyone,

I have the following scenario: I have a database table with 3 columns: a host (string), a timestamp, and an integer ID. Conceptually, what I'd like to do is:

group by host and timestamp -> based on all the IDs in each group, create a mapping to n new tuples -> for each unique tuple, count how many times it appeared across the resulting data

Each new tuple has 3 fields: the host, a new ID, and an Integer=1

What I'm currently doing is roughly:

val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
val source = environment.createInput(inut)
source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)

The query given to JDBCInputFormat provides results ordered by host and timestamp, and I was wondering if performance can be improved by specifying this in the code. I've looked at and, but I still have some questions:

- If a split is a subset of a partition, what is the meaning of SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a split is divided into partitions, meaning that a partition would be a subset of a split.
- At which point can I retrieve and adjust a SplitDataProperties instance, if possible at all?
- If I wanted a coarser parallelization where each slot gets all the data for the same host, would I have to manually create the sub-groups based on timestamp?


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg11200.html on line 69
Call Stack
10.0012358376{main}( ).../msg11200.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg11200.html on line 69
Call Stack
10.0012358376{main}( ).../msg11200.html:0