kyucumber
전체 글 보기

Spring Cloud Function을 이용한 Kafka Client 구성하기

Spring Cloud Function은 spring-cloud 하위의 프로젝트입니다. Spring Cloud Function을 이용하면 사용자가 정의한 Supplier Consumer Function등의 인터페이스를 FunctionCatalog가 읽어 Consume, Produce 등을 수행하는 표준 표현으로 변환하는 기능을 사용할 수 있습니다.

Spring Cloud Function에서는 Java 8 부터 사용할 수 있는 3가지 코어 Function(Supplier, Function, Consumer) interface들을 지원합니다. 각각의 인터페이스를 이용해 Kafka Producer, Consumer, Streams를 구성한 코드를 정리했습니다.

Supplier interface를 이용한 Kafka Producer 구성하기

Supplier interface를 통해 Kafka Producer를 구성할 수 있습니다. 아래와 같은 의존성 및 properties가 정의되어야 합니다. Producer의 경우 Non Reactive 혹은 Reactive 방식으로 구성할 수 있습니다.

build.gradle.kts

implementation("org.springframework.cloud:spring-cloud-function-kotlin") implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")

Kafka Binder Properties, Kafka Producer Properties

application.yml

spring: profiles: local cloud: function: definition: kotlinSupplier; # 함수 이름, 여러개인 경우 kotlinSupplier,kotlinConsumer; 이런 식으로 정의 stream: bindings: kotlinSupplier-out-0: destination: backoffice-auth.local.logging.access-info.json # kafka: binder: brokers: localhost defaultBrokerPort: 9092 configuration: acks: all bindings: kotlinSupplier-out-0: producer: # producer 세부 설정, 별도로 설정할 부분이 없어 생략

Non Reactive Producer

Non Reactive 방식의 Producer를 정의하기 위해서 아래와 같이 설정할 수 있습니다. 아래와 같이 설정하는 경우 default poller 에 의해서 polling 방식(Non Reactive)을 통해 producing을 진행합니다.

ProducerConfig

@Configuration class ProducerConfig { @Bean fun kotlinSupplier() = Supplier<String> { "produce test" } }

아래와 같이 Polling 방식으로 producing이 진행되는것을 확인할 수 있습니다.

nonreactivesupplier

Reactive Producer

Reactive 방식의 Producer를 정의하기 위해서 아래와 같이 구성할 수 있습니다. 예제 코드에서는 기본 구현체 중 하나인 EmitterProcessor를 이용했습니다.

ProducerConfig

@Configuration class ProducerConfig { @Bean fun processor(): EmitterProcessor<String> = EmitterProcessor.create<String>() @Bean fun processorSink(processor: EmitterProcessor<String>): FluxSink<String> { return processor.sink(FluxSink.OverflowStrategy.DROP) } @Bean fun kotlinSupplier(processor: EmitterProcessor<String>) = Supplier<Flux<String>> { processor } }

ProduceTestController

@RestController @RequestMapping("/api/v1") class ProduceTestController( private val processorSink: FluxSink<String> ) { @GetMapping fun produce() { this.processorSink.next(UUID.randomUUID().toString()) } }

위와 같이 코드를 정의하고, 해당 /api/v1 경로로 GET 요청을 보내면 아래와 같이 producing이 진행됩니다.

nonreactivesupplier

Consumer interface를 통해 Kafka Consumer 구성하기

Consumer interface를 통해 Kafka Consumer를 구성할 수 있습니다. 아래와 같은 의존성 및 설정이 정의되어야 합니다.

build.gradle.kts

implementation("org.springframework.cloud:spring-cloud-function-kotlin") implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")

Kafka Binder Properties, Kafka Consumer Properties

application.yml

spring: profiles: local cloud: function: definition: kotlinConsumer; stream: kafka: binder: brokers: localhost defaultBrokerPort: 9092 configuration: acks: all bindings: kotlinConsumer-in-0: startOffset: earliest bindings: kotlinConsumer-in-0: group: kotlin-consumer # consumer group id destination: topic # consume topic name

ConsumerConfig

@Configuration class ConsumerConfig { @Bean fun kotlinConsumer() = Consumer<String> { println(it) } }

Function interface를 이용해 Kafka Streams 구성하기

Function interface를 통해 Kafka Streams API를 구현할 수 있습니다. 아래와 같은 의존성 및 설정이 정의되어야 합니다.

build.gradle.kts

dependencies { implementation("org.springframework.cloud:spring-cloud-stream") implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams") }

Kafka Streams Properties

application.yml

spring: profiles: local cloud: function: definition: fullAuthAccessLogProcessor; stream: kafka: streams: binder: brokers: localhost:9092 configuration: acks: all bindings: fullAuthAccessLogProcessor-in-0: consumer: applicationId: access-info-log-processor bindings: fullAuthAccessLogProcessor-in-0: destination: backoffice-auth.local.logging.access-info.json fullAuthAccessLogProcessor-out-0: destination: backoffice-auth.local.logging.full-access-info.json

StreamsConfig

@Configuration class StreamsConfig { @Bean fun fullAuthAccessLogProcessor(fullAuthAccessInfoMapper: FullAuthAccessInfoMapper) = Function<KStream<String, AuthAccessInfoSchema>, KStream<String, FullAuthAccessInfoSchema>> { it.mapValues { value -> fullAuthAccessInfoMapper.map(value) } } @Bean fun fullAuthAccessInfoSchemaSerde(objectMapper: ObjectMapper): Serde<FullAuthAccessInfoSchema> = JsonSerde(FullAuthAccessInfoSchema::class.java, objectMapper) }

Additional Options

Spring Cloud Function에서는 여러 함수 설정, Composition, Kotlin lambda support 등의 추가적인 기능을 제공합니다.

Multiple Functions Binding

여러 Function들이 정의되는 경우 아래와 같이 definition을 통해 여러 함수를 지정해주어야 동작합니다.

--- spring: profiles: local cloud: function: definition: kotlinSupplier|kotlinConsumer;

Declarative Function Composition

여러 Function들의 Composition 기능도 제공합니다.

|를 통해 Composition을 표현할 수 있습니다.

spring: profiles: local cloud: function: definition: uppercase|reverse

Function Composition Example

Kotlin Lambda support

Kotlin lambda를 이용해 Functon, Supplier, Consumer를 구성하려는 경우 아래 의존성을 추가해야 합니다.

@Bean open fun kotlinSupplier(): () -> String { return { "Hello from Kotlin" } } @Bean open fun kotlinFunction(): (String) -> String { return { it.toUpperCase() } } @Bean open fun kotlinConsumer(): (String) -> Unit { return { println(it) } }

build.gradle.kts

dependencies { implementation("org.springframework.cloud:spring-cloud-function-kotlin") }

현재 작성된 버전 기준으로는 Kafka Streams에 대해서는 lambda support가 지원되지 않습니다.

Reference