Classloading Error with Kafka Streams

Hi all, I’m trying to use Kafka Streams, a Java library for consuming from and producing to Kafka topics.

I’ve successfully used Streams with JRuby, and I’m now prototyping a new app, for which I’m experimenting with Jython, Kotlin, and Clojure.

I’m having some class loading trouble with the Kotlin experiment.

I posted my problem to the kafka-users mailing list, but so far that has not yielded any hypotheses.

So I thought I’d cross-post here, just in case someone here has an interest in the mysteries of class loading:

$ kotlinc -cp kafka-streams-0.11.0.0-cp1.jar:kafka-clients-0.11.0.0-cp1.jar:slf4j-api-1.7.25.jar -jvm-target 1.8 
Welcome to Kotlin version 1.1.4-3 (JRE 1.8.0_141-b15)
Type :help for help, :quit for quit
import org.apache.kafka.streams.StreamsConfig
val config = hashMapOf("application.id" to "t2p.normalizer", "bootstrap.servers" to "localhost:9092")
StreamsConfig(config)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.lang.ExceptionInInitializerError
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.Serdes$ByteArraySerde for configuration default.key.serde: Class org.apache.kafka.common.serialization.Serdes$ByteArraySerde could not be found.
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
	at org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:944)
	at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:137)
	at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:157)
	at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:196)
	at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:358)
	at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:371)
	at org.apache.kafka.streams.StreamsConfig.<clinit>(StreamsConfig.java:270)
	at Line_2.<init>(Unknown Source)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.jetbrains.kotlin.cli.common.repl.GenericReplEvaluator$eval$1$scriptInstance$1.invoke(GenericReplEvaluator.kt:93)
  <cut>

I’ve put the entire thing into this gist.

If anyone wants to reproduce this, those JAR files can be found here:

I took a look at ConfigDef.java, and I think the error is being thrown from line 706:

so I tried running that in the Kotlin REPL:

import org.apache.kafka.common.utils.Utils
Class.forName("org.apache.kafka.common.serialization.Serdes\$ByteArraySerde", true, Utils.getContextOrKafkaClassLoader())
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)

(that’s the whole trace)

I thought maybe I was missing a JAR file, but this works fine:

Class.forName("org.apache.kafka.common.serialization.Serdes\$ByteArraySerde")
class org.apache.kafka.common.serialization.Serdes$ByteArraySerde

So I guess this has something to do with the ClassLoader being returned by Utils.getContextOrKafkaClassLoader()

But when I call that, the result looks sane to me:

import org.apache.kafka.common.utils.Utils
Utils.getContextOrKafkaClassLoader()
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
sun.misc.Launcher$AppClassLoader@55f96302

Not that I know much about class loaders, but AppClassLoader sounds at least somewhat reasonable.


That’s all I know at this point — any help would be greatly appreciated!

1 Like

I have encountered a similar problem, and submitted it via https://youtrack.jetbrains.net/issue/KT-24966

issue is caused by way Kafka load classes - it seems to be using wrong classloaded. Workaround with passing deserializer classes:
props.put(“key.deserializer”, Class.forName(“org.apache.kafka.common.serialization.StringDeserializer”));
props.put(“value.deserializer”, Class.forName(“org.apache.kafka.common.serialization.ByteArrayDeserializer”));