git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Sometimes DELETE of a row that was INSERT shortly before does not work


Hi,

we are having an issue that sometimes a row is not deleted when it was inserted a short time before. 
The cluster has two datacenter each with 3 nodes running C* 3.11.3. The clients are using Datastax cpp driver 2.10.0 on RHEL-7 with consistency level quorum for INSERT, DELETE and SELECT.
When testing on a single node cluster on my dev box the problem does not pop up.
So I'm not sure if this is a configuration problem, a problem in my code or in Cassandra.
I attach a small sample code that I used to recreate the problem. It adds first 10 rows. After that in a loop it adds another row and deletes it again and checks that can't be loaded anymore. Sometimes it fails after less than 1000 rows and other times after more than 10000 rows.

Thanks for any feedback
Oliver

#include <cassandra.h>
#include <stdio.h>
#include <stdlib.h>

#define PKEY_1 "pk1"
#define PKEY_2 "pk2"
#define CKEY_1 "ck1"
#define CONSISTENCY CASS_CONSISTENCY_QUORUM

#define CHECK_FUT(F)                                            \
{                                                               \
    CassError err = cass_future_error_code(F);                  \
                                                                \
    if (err != CASS_OK)                                         \
    {                                                           \
        const char* err_msg;                                    \
        size_t len;                                             \
        cass_future_error_message(F, &err_msg, &len);           \
                                                                \
        fprintf(stderr, "Line %d: error %d: %.*s\n",            \
                __LINE__,                                       \
                err,                                            \
                len,                                            \
                err_msg);                                       \
        abort();                                                \
    }                                                           \
}

#define CHECK_ERR(E)                                            \
{                                                               \
    if (E != CASS_OK)                                           \
    {                                                           \
        fprintf(stderr, "Line %d: error %d\n",                  \
                __LINE__,                                       \
                E);                                             \
        abort();                                                \
    }                                                           \
}


CassCluster* cluster = 0;
CassSession* session = 0;

void initialize(const char* host)
{
    cluster = cass_cluster_new();
    session = cass_session_new();

    CassError rc = cass_cluster_set_contact_points(cluster, host);
    CHECK_ERR(rc);

    CassFuture* con_fut = cass_session_connect(session, cluster);
    const CassResult* res = cass_future_get_result(con_fut);
    if (!res)
        CHECK_FUT(con_fut);

    cass_future_free(con_fut);

    /* Create keyspace */
    CassStatement* ks_stmt = cass_statement_new(
                "CREATE KEYSPACE IF NOT EXISTS del_test WITH replication "
                "= { 'class': 'NetworkTopologyStrategy',"
                "'datacenter1': 3, 'datacenter2': 3 };",
                0);

    rc = cass_statement_set_consistency(ks_stmt, CONSISTENCY);
    CHECK_ERR(rc);
    CassFuture* ks_fut = cass_session_execute(session, ks_stmt);
    CHECK_FUT(ks_fut);
    cass_statement_free(ks_stmt);
    cass_future_free(ks_fut);

    /* Create table */
    CassStatement* tbl_stmt = cass_statement_new(
                "CREATE TABLE IF NOT EXISTS del_test.table1 ("
                "pk_1 text, pk_2 text, ck_1 text, "
                "col_0 bigint, col_1 bigint, col_2 text, "
                "col_3 bigint, col_4 bigint, col_5 text, "
                "col_6 bigint, col_7 bigint, col_8 text, "
                "PRIMARY KEY ((pk_1, pk_2), ck_1));",
                0);

    rc = cass_statement_set_consistency(tbl_stmt, CONSISTENCY);
    CHECK_ERR(rc);
    CassFuture* tbl_fut = cass_session_execute(session, tbl_stmt);
    CHECK_FUT(tbl_fut);
    cass_statement_free(tbl_stmt);
    cass_future_free(tbl_fut);
}

void del_partition()
{
    CassStatement* stmt = cass_statement_new(
                "DELETE FROM del_test.table1 WHERE pk_1 = '" PKEY_1 "' "
                "AND pk_2 = '" PKEY_2 "';",
                0);

    CassError rc = cass_statement_set_consistency(stmt, CONSISTENCY);
    CHECK_ERR(rc);
    CassFuture* fut = cass_session_execute(session, stmt);
    CHECK_FUT(fut);
    cass_statement_free(stmt);
    cass_future_free(fut);
}

void add_row(size_t idx)
{
    CassStatement* stmt = cass_statement_new(
                "INSERT INTO del_test.table1(pk_1, pk_2, ck_1, "
                "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8) "
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
                12);
    char val[32];
    size_t i=0;

    cass_statement_bind_string(stmt, i++, PKEY_1);
    cass_statement_bind_string(stmt, i++, PKEY_2);
    sprintf(val, "r%zu", idx);
    cass_statement_bind_string(stmt, i++, val);

    int n;
    for (n=0; n<3; ++n)
    {
        cass_statement_bind_int64(stmt, i, idx*100+i); ++i;
        cass_statement_bind_int64(stmt, i, idx*100+i); ++i;
        sprintf(val, "val_%zu", idx*100+i);
        cass_statement_bind_string(stmt, i++, val);
    }

    CassError rc = cass_statement_set_consistency(stmt, CONSISTENCY);
    CHECK_ERR(rc);
    CassFuture* fut = cass_session_execute(session, stmt);
    CHECK_FUT(fut);
    cass_statement_free(stmt);
    cass_future_free(fut);
}

void del_row(size_t idx)
{
    CassStatement* stmt = cass_statement_new(
                "DELETE FROM del_test.table1 WHERE pk_1 = '" PKEY_1 "' "
                "AND pk_2 = '" PKEY_2 "' AND ck_1 = ?;",
                1);

    char val[32];
    sprintf(val, "r%zu", idx);
    cass_statement_bind_string(stmt, 0, val);

    CassError rc = cass_statement_set_consistency(stmt, CONSISTENCY);
    CHECK_ERR(rc);
    CassFuture* fut = cass_session_execute(session, stmt);
    CHECK_FUT(fut);
    cass_statement_free(stmt);
    cass_future_free(fut);
}

void check_del(size_t idx)
{
    CassStatement* stmt = cass_statement_new(
                "SELECT * FROM del_test.table1 WHERE pk_1 = '" PKEY_1 "' "
                "AND pk_2 = '" PKEY_2 "' AND ck_1 = ?;",
                1);

    char val[32];
    sprintf(val, "r%zu", idx);
    cass_statement_bind_string(stmt, 0, val);

    CassError rc = cass_statement_set_consistency(stmt, CONSISTENCY);
    CHECK_ERR(rc);
    CassFuture* fut = cass_session_execute(session, stmt);
    const CassResult* res = cass_future_get_result(fut);
    if (!res)
    {
        rc = cass_future_error_code(fut);
        CHECK_ERR(rc);
    }

    size_t num_rows = cass_result_row_count(res);
    if (num_rows > 0)
    {
        fprintf(stderr, "Line %d: row %zu not deleted\n",
                __LINE__,
                idx);
        abort();
    }

    cass_result_free(res);
    cass_statement_free(stmt);
    cass_future_free(fut);
}

int main(int argc, const char* argv[])
{
    const char* host = "127.0.0.1";
    if (argc > 1)
        host = argv[1];

    /* Create keyspace and tabel */
    initialize(host);

    /* Delete all rows */
    del_partition();

    int row;
    for (row=0; row<10; ++row)
    {
        add_row(row);
    }

    for (; ; ++row)
    {
        add_row(row);
        del_row(row);
        check_del(row);
    }
    return 0;
}