Cannot use lambdas with Apache Flink

I’m prototyping some complex event processing business rules with Apache Flink, and ran into a problem: I can’t use lambdas with any of the Flink APIs that allow them. This is with the Kotlin 1.1 preview by the way.

For example, where I would normally write something like:

val byCustomKey = stream.keyBy { it.myCustomKey }

I have to instead use an anonymous object, like so:

val keySelector = object: KeySelector<Event, String> {
  override fun getKey(event: Event): String = event.myCustomKey
}

val byCustomKey = stream.keyBy(keySelector)

So I basically have to create an anonymous object anywhere I would otherwise provide a lambda. What’s interesting here is that I can pass references to free functions, so I can do something like this:

fun selectKey(event: Event) = event.myCustomKey
val byCustomKey = stream.keyBy(::selectKey)

However, the function references only work against free functions - not actual instance methods on a given object.

I assume that the kotlin compiler isn’t emitting something. There is a note in the Flink documentation that specifically cites using Java 8 lambdas with Flink and advises using the Eclipse JDT compiler: Apache Flink 1.2 Documentation: Java 8

So am I stuck with ugly anonymous objects here?

2 Likes

keyBy takes a KeySelector interface which contains a single method. Therefore, Kotlin, like Java 8, should do a SAM conversion for lambdas. I have successfully used this recently with keyBy on a Kotlin/Flink-based project.

What error message are you getting?

It compiles fine - I only encounter the issue at runtime. Here is a stack trace:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.api.java.functions.KeySelector could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
	at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1095)
	at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:615)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:425)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:297)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:290)
	at org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:99)
	at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:242)

I forgot to include in the OP this is Flink 1.2.0

The problem with lambdas can be mitigated somewhat, at least for DataStream.map and flatMap, if I follow it up with DataStream.returns - that provides a sufficient type hint. But it doesn’t work for KeySelector lambda implementations, or the CEP library’s PatternSelectorFunction interface. A lambda in place of the latter yields the same kind of stack trace.

First question is whether KeySelector is an interface or an abstract class. SAM conversion only works for interfaces.

I know nothing about calling Fink, but there are times when you have to be a little more explicit to Kotlin about what SAM you mean. One such case is if you have multiple methods with the same name that take different SAMs. So you might have to do this:

val byCustomKey = stream.keyBy KeySelector{ it.myCustomKey }

or possibly in some rare cases:

val byCustomKey
    = stream.keyBy KeySelector<Event, String>{ it.myCustomKey }

See http://kotlinlang.org/docs/reference/java-interop.html#sam-conversions

Have you solved this problem? I have the same issue.

1 Like