Spring Cloud Function은 spring-cloud 하위의 프로젝트입니다.
Spring Cloud Function을 이용하면 사용자가 정의한 Supplier
Consumer
Function
등의 인터페이스를 FunctionCatalog가 읽어 Consume, Produce 등을 수행하는 표준 표현으로 변환하는 기능을 사용할 수 있습니다.
Spring Cloud Function에서는 Java 8 부터 사용할 수 있는 3가지 코어 Function(Supplier
, Function
, Consumer
) interface들을 지원합니다.
각각의 인터페이스를 이용해 Kafka Producer, Consumer, Streams를 구성한 코드를 정리했습니다.
Supplier interface를 이용한 Kafka Producer 구성하기
Supplier interface를 통해 Kafka Producer를 구성할 수 있습니다. 아래와 같은 의존성 및 properties가 정의되어야 합니다. Producer의 경우 Non Reactive 혹은 Reactive 방식으로 구성할 수 있습니다.
build.gradle.kts
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
Kafka Binder Properties, Kafka Producer Properties
application.yml
spring:
profiles: local
cloud:
function:
definition: kotlinSupplier; # 함수 이름, 여러개인 경우 kotlinSupplier,kotlinConsumer; 이런 식으로 정의
stream:
bindings:
kotlinSupplier-out-0:
destination: backoffice-auth.local.logging.access-info.json #
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
acks: all
bindings:
kotlinSupplier-out-0:
producer: # producer 세부 설정, 별도로 설정할 부분이 없어 생략
Non Reactive Producer
Non Reactive 방식의 Producer를 정의하기 위해서 아래와 같이 설정할 수 있습니다. 아래와 같이 설정하는 경우 default poller 에 의해서 polling 방식(Non Reactive)을 통해 producing을 진행합니다.
ProducerConfig
@Configuration
class ProducerConfig {
@Bean
fun kotlinSupplier() = Supplier<String> {
"produce test"
}
}
아래와 같이 Polling 방식으로 producing이 진행되는것을 확인할 수 있습니다.
Reactive Producer
Reactive 방식의 Producer를 정의하기 위해서 아래와 같이 구성할 수 있습니다.
예제 코드에서는 기본 구현체 중 하나인 EmitterProcessor
를 이용했습니다.
ProducerConfig
@Configuration
class ProducerConfig {
@Bean
fun processor(): EmitterProcessor<String> =
EmitterProcessor.create<String>()
@Bean
fun processorSink(processor: EmitterProcessor<String>): FluxSink<String> {
return processor.sink(FluxSink.OverflowStrategy.DROP)
}
@Bean
fun kotlinSupplier(processor: EmitterProcessor<String>) = Supplier<Flux<String>> {
processor
}
}
ProduceTestController
@RestController
@RequestMapping("/api/v1")
class ProduceTestController(
private val processorSink: FluxSink<String>
) {
@GetMapping
fun produce() {
this.processorSink.next(UUID.randomUUID().toString())
}
}
위와 같이 코드를 정의하고, 해당 /api/v1
경로로 GET 요청을 보내면 아래와 같이 producing이 진행됩니다.
Consumer interface를 통해 Kafka Consumer 구성하기
Consumer interface를 통해 Kafka Consumer를 구성할 수 있습니다. 아래와 같은 의존성 및 설정이 정의되어야 합니다.
build.gradle.kts
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
Kafka Binder Properties, Kafka Consumer Properties
application.yml
spring:
profiles: local
cloud:
function:
definition: kotlinConsumer;
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
acks: all
bindings:
kotlinConsumer-in-0:
startOffset: earliest
bindings:
kotlinConsumer-in-0:
group: kotlin-consumer # consumer group id
destination: topic # consume topic name
ConsumerConfig
@Configuration
class ConsumerConfig {
@Bean
fun kotlinConsumer() = Consumer<String> {
println(it)
}
}
Function interface를 이용해 Kafka Streams 구성하기
Function interface를 통해 Kafka Streams API를 구현할 수 있습니다. 아래와 같은 의존성 및 설정이 정의되어야 합니다.
build.gradle.kts
dependencies {
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
}
application.yml
spring:
profiles: local
cloud:
function:
definition: fullAuthAccessLogProcessor;
stream:
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
acks: all
bindings:
fullAuthAccessLogProcessor-in-0:
consumer:
applicationId: access-info-log-processor
bindings:
fullAuthAccessLogProcessor-in-0:
destination: backoffice-auth.local.logging.access-info.json
fullAuthAccessLogProcessor-out-0:
destination: backoffice-auth.local.logging.full-access-info.json
StreamsConfig
@Configuration
class StreamsConfig {
@Bean
fun fullAuthAccessLogProcessor(fullAuthAccessInfoMapper: FullAuthAccessInfoMapper) =
Function<KStream<String, AuthAccessInfoSchema>, KStream<String, FullAuthAccessInfoSchema>> {
it.mapValues { value -> fullAuthAccessInfoMapper.map(value) }
}
@Bean
fun fullAuthAccessInfoSchemaSerde(objectMapper: ObjectMapper): Serde<FullAuthAccessInfoSchema> =
JsonSerde(FullAuthAccessInfoSchema::class.java, objectMapper)
}
Additional Options
Spring Cloud Function에서는 여러 함수 설정, Composition, Kotlin lambda support 등의 추가적인 기능을 제공합니다.
Multiple Functions Binding
여러 Function들이 정의되는 경우 아래와 같이 definition을 통해 여러 함수를 지정해주어야 동작합니다.
---
spring:
profiles: local
cloud:
function:
definition: kotlinSupplier|kotlinConsumer;
Declarative Function Composition
여러 Function들의 Composition 기능도 제공합니다.
|
를 통해 Composition을 표현할 수 있습니다.
spring:
profiles: local
cloud:
function:
definition: uppercase|reverse
Kotlin Lambda support
Kotlin lambda를 이용해 Functon, Supplier, Consumer를 구성하려는 경우 아래 의존성을 추가해야 합니다.
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
build.gradle.kts
dependencies {
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
}
현재 작성된 버전 기준으로는 Kafka Streams에 대해서는 lambda support가 지원되지 않습니다.