Using SAM lambda in Flink causes `InvalidTypesException`

I’m following the Datastream API walkthrough while rewriting the example in Kotlin. I tried

val alerts: DataStream<Alert> = transactions
        .keyBy { it.accountId }

and

val alerts: DataStream<Alert> = transactions
        .keyBy(Transaction::getAccountId)

to set up keyed context.

But both of the approach causes the following exception.

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.api.java.functions.KeySelector could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point

After some research, I came up with this version.

This time it worked. But as you can see, the IDE thought the SAM-constructor is redundant. If I followed this advice, I would end up the beginning version which has the problem.

So I think there must be something wrong. Maybe Kotlin compiler, Kotlin IDE Plugin, or Flink.

There is a related post about this problem.

Could you share your code? I still get the exception when I use

.keyBy(KeySelector<Transaction, Long> { it.accountId })

This is an issue with the Kotlin compiler. Issues occur on version 1.5.0 and higher.

https://issues.apache.org/jira/projects/FLINK/issues/FLINK-23979?filter=allopenissues