KafkaLoader connected to Confluent Cloud

Hi, im trying to connect kafka loader to confluent cloud. The following config feels close, I don’t appear to get any errors coming out other than a timeout on “Try to list topic metadata from Kafka broker”

It also seems that none of the log files are being written to on /home/tigergraph/tigergraph/log) - i’m on the docker container: docker.tigergraph.com/tigergraph:latest

    {
    	"broker": "SERVER_ID.eu-west-1.aws.confluent.cloud:9092",
    	"kafka_config": {
    		"bootstrap.servers": "SERVER_ID.eu-west-1.aws.confluent.cloud:9092",
    		"group.id": "tigergraph",
    		"security.protocol":"plaintext",
    		"sasl.mechanism":"PLAIN",
    		"sasl.username":"API_KEY",
    		"sasl.password":"API_SECRET",
    		"schema.registry.url":"https://AAA.eu-central-1.aws.confluent.cloud",
    		"basic.auth.credentials.source":"USER_INFO",
    		"basic.auth.user.info":"SR_API_KEY:SR_API_SECRET"
    	}
    }

New findings - providing “sasl.jaas.config” means it doesn’t timeout or give the error “Failed to construct kafka consumer”

However, it does give the error: Java JAAS configuration is not supported. Latest config:

    {
	"broker": "XYD.eu-west-1.aws.confluent.cloud:9092",
	"kafka_config": {
		"bootstrap.servers": "XYD.eu-west-1.aws.confluent.cloud:9092",
		"group.id": "tigergraph",
		"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXX\" password=\"UUU\";",
		"security.protocol":"SASL_SSL",
		"sasl.mechanism":"PLAIN",
		"ssl.ca.location":"/etc/ssl/certs/",
		"sasl.username":"XXX",
		"sasl.password":"YYY"
	}
}

i’ve tested these parameters using kafkacat as detailed here:

Albeit that this one requires sasl.mechanisms instead of sasl.mechanism - with kafkacat the same config works and consumes from the topics.

The obvious thing to do is to drop sasl.jaas.config but then it doesn’t get past (i assume) some kinda validator

For the next person finding to this - I believe my issues have been coming from the tigergraph docker container (both official and xpertmind)
https://github.com/experoinc/graph-based-rbac-with-streaming-synchronization (i think this one has issues with java libraries, causing the sasl error)
https://github.com/xpertmind/TigerGraph (my guess is a networking error on this one now, but could also be missing packages to work with the remote kafka stream, e.g ssl bundles had to be installed initially, so it’s obv really minimal)

The following config passes authentication on xpertmind:

    {
    	"broker": "XYD.eu-west-1.aws.confluent.cloud:9092",
    	"kafka_config": {
    		"bootstrap.servers": "XYD.eu-west-1.aws.confluent.cloud:9092",
    		"group.id": "tigergraph",
    		"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username='XXX' password='UUU';",
    		"security.protocol":"SASL_SSL",
    		"sasl.mechanism":"PLAIN"
    	}
    }

It then stalls at trying to read the data out of confluent cloud - but i’m starting to think that may be something specific to the docker-compose. I’m currently awaiting GSQL access on Tigergraph Cloud without credit card which should just allow this to work with no further complications

@thepauleh what version of TG Docker are you using? I’m maintaining xpertmind Docker image, so maybe we can find what’s wrong there. What is your docker-compose looking like ?

Bruno

1 Like

Hi @Bruno i’m on xpertmind/tigergraph:latest (downloaded today). I discovered I can only seem to have one kafka-loader running at a time - which is why it appeared to ‘stall’ reading the data after passing authentication.

For the more reliable results i’m restarting the container in between each loader installation to prevent that confusion around the loader state.

This then leads me back to my second post with the complication around ‘sasl.jaas.config’ giving the following error if supplied:
failed with error: Java JAAS configuration is not supported, see https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka for more information

and the other error if not supplied:
Failed to construct kafka consumer

OK, I just changed something in the image and am pushing the new version online (latest / 3.3.0)
Please try with it. (docker-compose pull && docker-compose up -d)
I also published an updated docker-compose.yaml file.

Bruno

Hi,

Here’s further details on the underlying error - i’m now convinced this is related to differences in tigergraph components rather than the docker image specifically:

log/gsql/log.ERROR
I@20211209 12:29:50.877 tigergraph|127.0.0.1:54574|00000000028 (BaseLoadingJob.java:341) Start Loading job kafkatest_job
E@20211209 12:29:50.886 tigergraph|127.0.0.1:54574|00000000028 (QueryBlockHandler.java:227) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at com.tigergraph.schema.plan.job.KafkaLoadingJob.GetKafkaBrokerTopics(KafkaLoadingJob.java:78)
	at com.tigergraph.schema.plan.job.KafkaLoadingJob.splitKafkaTopicPartitions(KafkaLoadingJob.java:127)
	at com.tigergraph.schema.plan.job.KafkaLoadingJob.runConcurrentLoadingJob(KafkaLoadingJob.java:270)
	at com.tigergraph.schema.plan.job.BaseLoadingJob.runLoadingJobs(BaseLoadingJob.java:342)
	at com.tigergraph.schema.plan.job.BaseLoadingJob.runLoadingJobs(BaseLoadingJob.java:290)
	at com.tigergraph.schema.handler.QueryBlockHandler.a(QueryBlockHandler.java:453)
	at com.tigergraph.schema.handler.QueryBlockHandler.a(QueryBlockHandler.java:220)
	at com.tigergraph.schema.handler.QueryBlockHandler.a(QueryBlockHandler.java:159)
	at com.tigergraph.schema.handler.FileHandler.a(FileHandler.java:44)
	at com.tigergraph.schema.handler.BaseHandler.handle(BaseHandler.java:288)
	at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:77)
	at jdk.httpserver/sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:71)
	at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:80)
	at jdk.httpserver/sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:692)
	at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:77)
	at jdk.httpserver/sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:664)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
	... 21 more
E@20211209 12:29:50.887 tigergraph|127.0.0.1:54574|00000000028 (QueryBlockHandler.java:173) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at com.tigergraph.schema.plan.job.KafkaLoadingJob.GetKafkaBrokerTopics(KafkaLoadingJob.java:78)
	at com.tigergraph.schema.plan.job.KafkaLoadingJob.splitKafkaTopicPartitions(KafkaLoadingJob.java:127)
	at com.tigergraph.schema.plan.job.KafkaLoadingJob.runConcurrentLoadingJob(KafkaLoadingJob.java:270)
	at com.tigergraph.schema.plan.job.BaseLoadingJob.runLoadingJobs(BaseLoadingJob.java:342)
	at com.tigergraph.schema.plan.job.BaseLoadingJob.runLoadingJobs(BaseLoadingJob.java:290)
	at com.tigergraph.schema.handler.QueryBlockHandler.a(QueryBlockHandler.java:453)
	at com.tigergraph.schema.handler.QueryBlockHandler.a(QueryBlockHandler.java:220)
	at com.tigergraph.schema.handler.QueryBlockHandler.a(QueryBlockHandler.java:159)
	at com.tigergraph.schema.handler.FileHandler.a(FileHandler.java:44)
	at com.tigergraph.schema.handler.BaseHandler.handle(BaseHandler.java:288)
	at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:77)
	at jdk.httpserver/sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:71)
	at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:80)
	at jdk.httpserver/sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:692)
	at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:77)
	at jdk.httpserver/sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:664)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
	... 21 more

This error shows that in order to create org.apache.kafka.clients.consumer.KafkaConsumer.KafkaConsumer.java then “sasl.jaas.config” must be supplied.

The only alternative to this being that java.security.auth.login.config is overriden as a java system parameter for the GSQL Service.

As mentioned before, when the config is then passed to kafkaLoader, the saas.jaas.config causes the edenhill/libkafkard project to bork, as the superfluous parameter causes an error - as it refuses to continue with it (even though sasl.username and sasl.password are there for auth to work).

The following options for fixes appear possible

Options:

  1. Allow GSQL to configure the parameter java.security.auth.login.config - though this won’t work well for tigergraph cloud.
  2. Guide on how where to place java system parameters so when gadmin restart runs, gsql will pick up -Djava.security.auth.login.config
  3. Intercept the KafkaConsumer.java constructor and handle authentication based on sasl parameters.
  4. Switch the kafka consumer used in gsql to be the librdkafka used by the loader
  5. When kafka loader fetches the config, delete the value of kafka_config.“sasl.jaas.config” before initialising the loader
  6. Convince edenhill/librdkafka project to ignore the superfluous parameter
  7. Use the java consumer as the kafka loader, never rely on the edenhill project

@thepauleh SASL with TG and Kafka is supported from version 3.5.0