git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

FlinkKinesisProducer weird behaviour


Hi,

We're using Kinesis as our input & output of a job and experiencing parsing exception while reading from the output stream. All streams contain 1 shard only.

While investigating the issue I noticed a weird behaviour where records get a PartitionKey I did not assign and the record Data is being wrapped with random illegal chars.

I wrote a very basic program to try to isolate the problem, but still I see this happening:
  • I wrote a simple SourceFunction which generates messages of the pattern - <sequence#>-AAAAAAAAAAA\n
  • FlinkKinesisProducer writes the messages the Kinesis stream with a default partitionKey of "0" - so I expect ALL records to have partitionKey of "0"
To verify the records in the Kinesis stream I use AWS CLI get-records API and see the following:

.......................
        {
            "SequenceNumber": "49584735873509122272926425626745413182361252610143420418",
            "ApproximateArrivalTimestamp": 1527144766.662,
            "Data": "84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ",
            "PartitionKey": "a"
        },
        {
            "SequenceNumber": "49584735873509122272926425626746622108180867308037603330",
            "ApproximateArrivalTimestamp": 1527144766.86,
            "Data": "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
            "PartitionKey": "0"
        },
.......................

Where did PartitionKey "a" come from?

Further more, if you Base64 decode the record data of the records you see that all records written with this PartitionKey "a" are wrapped with weird illegal characters.
For example:

$ echo 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ | base64 --decode
��
0h79-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
h80-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
hC
  �&EW��s�U�r

While the records with PartitionKey "0" look good:

$ echo ODEtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo= | base64 --decode
81-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA


I tried using both 1.4.2 version & 1.6-SNAPSHOT and still see the issue...


Am I missing anything? Has anyone encountered such issue?

Would appreciate any help,

Rafi


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg09420.html on line 70
Call Stack
#TimeMemoryFunctionLocation
10.0008372760{main}( ).../msg09420.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg09420.html on line 70
Call Stack
#TimeMemoryFunctionLocation
10.0008372760{main}( ).../msg09420.html:0