Re: Using FlinkKinesisConsumer through a proxy


Since Flink 1.5, you should be able to set all available configurations on the ClientConfiguration through the consumer Properties (see FLINK-9188 [1]).

The way to do that would be to prefix the configuration you want to set with "aws.clientconfig" and add that to the properties, as such:

Properties kinesisConsumerProps = new Properties();
kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);

Could you try that out and see if it works for you?

I've also realized that this feature isn't documented very well, and have opened a ticket for that [2].


I have been trying with all variations  to no avail of java -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://... -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=.. -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=... -Dhttp.proxyPassword=... -jar .. after looking at the code in com.amazonaws.ClientConfiguration

How do I use FlinkKinesisConsumer using the Properties through a proxy ? Getting a Connection issue through the proxy. 
Works outside the proxy.

Properties kinesisConsumerConfig = new Properties();
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);

        if (local) {
            kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
            kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
        } else {
            kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

        //only for Consumer
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
        FlinkKinesisConsumer<Tuple2<InputMetadata, CameraWithCube>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new Tuple2KinesisSchema(), kinesisConsumerConfig);