git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Using FlinkKinesisConsumer through a proxy


Hi,

Since Flink 1.5, you should be able to set all available configurations on the ClientConfiguration through the consumer Properties (see FLINK-9188 [1]).

The way to do that would be to prefix the configuration you want to set with "aws.clientconfig" and add that to the properties, as such:

```
Properties kinesisConsumerProps = new Properties();
kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
...
```

Could you try that out and see if it works for you?

I've also realized that this feature isn't documented very well, and have opened a ticket for that [2].

Cheers,
Gordon

On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek <aljoscha@xxxxxxxxxx> wrote:
Hi,

I'm looping in Gordon and Thomas, they might have some idea about how to resolve this.

Best,
Aljoscha

On 3. Oct 2018, at 17:29, Vijay Balakrishnan <bvijaykr@xxxxxxxxx> wrote:

I have been trying with all variations  to no avail of java -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://... -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=.. -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=... -Dhttp.proxyPassword=... -jar .. after looking at the code in com.amazonaws.ClientConfiguration

On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan <bvijaykr@xxxxxxxxx> wrote:
HI,
How do I use FlinkKinesisConsumer using the Properties through a proxy ? Getting a Connection issue through the proxy. 
Works outside the proxy.

Properties kinesisConsumerConfig = new Properties();
        kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);

        if (local) {
            kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
            kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
        } else {
            kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
        }

        //only for Consumer
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
        FlinkKinesisConsumer<Tuple2<InputMetadata, CameraWithCube>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new Tuple2KinesisSchema(), kinesisConsumerConfig);
TIA