git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Reducing database connection with JdbcIO


Hi Aleksandr,

I don't get your point about the connection: on your worker, who gonna create the threads ? Basically, we will have a connection per thread right now.

For the checkpoint, JdbcIO is basically a DoFn, it doesn't deal with a CheckpointMark (if it's what you are talking about). Recently, we added support of backoff retries for the write part: you can specify a kind of Exception for which in that case JdbcIO can retry the statement (especially interesting in case of deadlock).

Regards
JB

On 14/03/2018 21:55, Aleksandr wrote:
I mean that in case of many threads beam will create many connections( per thread, per query). Lets say i have 10 different tables. So for each table insert beam will create own connection ++ for each thread for that insert.

Lets say I have some uuid generation and BQ insert. In case of problems with BQ service the exception will be thrown, but my job will be restored from latest checkpoint. So I will not generate uuid for same message twice. In case of jdbcio it is possible to get uuid for same mesage twice( in case of multiple io it might be a problem).

Aleksandr.

14. märts 2018 10:37 PM kirjutas kuupäeval "Eugene Kirpichov" <kirpichov@xxxxxxxxxx <mailto:kirpichov@xxxxxxxxxx>>:

    Aleksandr - it seems that you're assuming that every prepared
    statement uses a connection. This is not the case: we open a
    connection, and use that connection to create prepared statements.
    For any given thread, there's at most 1 connection open at the same
    time, and the connection has at most 1 prepared statement open.

    Create thread -> (open connection -> (open prepared statement ->
    executeBatch* -> close prepared statement)* -> close connection)*

    I'm not sure what you mean by checkpoints, can you elaborate?

    On Wed, Mar 14, 2018 at 1:20 PM Aleksandr <aleksandr.vl@xxxxxxxxx
    <mailto:aleksandr.vl@xxxxxxxxx>> wrote:

        So lets say I have 10 prepared statements, and hundreds threads,
        for example 300. Dataflow will create 3000 connections to sql
        and in case of autoscaling another node will create again 3000
        connections?

        Another problem here, that jdbcio dont use any checkpoints (and
        bq for example is doing that). So every connection exception
        will be thrown upper.


        14. märts 2018 10:09 PM kirjutas kuupäeval "Eugene Kirpichov"
        <kirpichov@xxxxxxxxxx <mailto:kirpichov@xxxxxxxxxx>>:

            In a streaming job it'll be roughly once per thread per
            worker, and Dataflow Streaming runner may create hundreds of
            threads per worker because it assumes that they are not
            heavyweight and that low latency is the primary goal rather
            than high throughput (as in batch runner).

            A hacky way to limit this parallelism would be to emulate
            the "repartition", by inserting a chain of transforms: pair
            with a random key in [0,n), group by key, ungroup -
            procesing of the result until the next GBK will not be
            parallelized more than n-wise in practice in the Dataflow
            streaming runner, so in the particular case of
            JdbcIO.write() with its current implementation it should
            help. It may break in the future, e.g. if JdbcIO.write()
            ever changes to include a GBK before writing. Unfortunately
            I can't recommend a long-term reliable solution for the moment.

            On Wed, Mar 14, 2018 at 12:57 PM Aleksandr
            <aleksandr.vl@xxxxxxxxx <mailto:aleksandr.vl@xxxxxxxxx>> wrote:

                Hello,
                How many times will the setup per node be called? Is it
                possible to limit pardo intances in google dataflow?

                Aleksandr.



                14. märts 2018 9:22 PM kirjutas kuupäeval "Eugene
                Kirpichov" <kirpichov@xxxxxxxxxx
                <mailto:kirpichov@xxxxxxxxxx>>:

                    "Jdbcio will create for each prepared statement new
                    connection" - this is not the case: the connection
                    is created in @Setup and deleted in @Teardown.
                    https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503
                    <https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L503>
                    https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631
                    <https://github.com/apache/beam/blob/v2.3.0/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L631>

                    Something else must be going wrong.

                    On Wed, Mar 14, 2018 at 12:11 PM Aleksandr
                    <aleksandr.vl@xxxxxxxxx
                    <mailto:aleksandr.vl@xxxxxxxxx>> wrote:

                        Hello, we had similar problem. Current jdbcio
                        will cause alot of connection errors.

                        Typically you have more than one prepared
                        statement. Jdbcio will create for each prepared
                        statement new connection(and close only in
                        teardown) So it is possible that connection will
                        get timeot or in case in case of auto scaling
                        you will get to many connections to sql.
                        Our solution was to create connection pool in
                        setup and get connection and return back to pool
                        in processElement.

                        Best Regards,
                        Aleksandr Gortujev.

                        14. märts 2018 8:52 PM kirjutas kuupäeval
                        "Jean-Baptiste Onofré" <jb@xxxxxxxxxxxx
                        <mailto:jb@xxxxxxxxxxxx>>:

                            Agree especially using the current JdbcIO
                            impl that creates connection in the @Setup.
                            Or it means that @Teardown is never called ?

                            Regards
                            JB
                            Le 14 mars 2018, à 11:40, Eugene Kirpichov
                            <kirpichov@xxxxxxxxxx
                            <mailto:kirpichov@xxxxxxxxxx>> a écrit:

                                Hi Derek - could you explain where does
                                the "3000 connections" number come from,
                                i.e. how did you measure it? It's weird
                                that 5-6 workers would use 3000 connections.

                                On Wed, Mar 14, 2018 at 3:50 AM Derek
                                Chan <derekcsy@xxxxxxxxx
                                <mailto:derekcsy@xxxxxxxxx>> wrote:

                                    Hi,

                                    We are new to Beam and need some help.

                                    We are working on a flow to ingest
                                    events and writes the aggregated
                                    counts to a database. The input rate
                                    is rather low (~2000 message per
                                    sec), but the processing is
                                    relatively heavy, that we need to
                                    scale out
                                    to 5~6 nodes. The output (via JDBC)
                                    is aggregated, so the volume is also
                                    low. But because of the number of
                                    workers, it keeps 3000 connections to
                                    the database and it keeps hitting
                                    the database connection limits.

                                    Is there a way that we can reduce
                                    the concurrency only at the output
                                    stage? (In Spark we would have done
                                    a repartition/coalesce).

                                    And, if it matters, we are using
                                    Apache Beam 2.2 via Scio, on Google
                                    Dataflow.

                                    Thank you in advance!