메이쁘

[IT갈피] 실전! 스프링5를 활용한 리액티브 프로그래밍 - 2장 : 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (1/2) 본문

나의 갈피/IT갈피

[IT갈피] 실전! 스프링5를 활용한 리액티브 프로그래밍 - 2장 : 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (1/2)

메이쁘 2022. 11. 24. 20:52

안녕하세요.

 

요즈음 개인적으로 시간내서 IT공부를 하고 있지 않았고,

최근들어 다시 공부의 필요성을 느꼈습니다.

 

회사의 도움으로 교재를 구매하여 따로 공부하며 잊지않고 기록하기 위해 갈피를 잡아보려합니다.

비단 교재 뿐 아니라 IT와 관련된 내용이라면 여기 IT갈피 에 잡으려고요.

 

저의 IT갈피를 보러 와주신 분들께 감사인사 드리면서,

시작하겠습니다!


실전! 스프링 5를 활용한 리액티브 프로그래밍

올레 도쿠카,이호르 로진스키 지음 | 김시영 옮김

 

https://search.shopping.naver.com/book/catalog/32482369871

 

실전! 스프링 5를 활용한 리액티브 프로그래밍 : 네이버 도서

네이버 도서 상세정보를 제공합니다.

search.shopping.naver.com

 

 


2장 - 스프링을 이용한 리액티브 프로그래밍 - 기본 개념

  1. 리액티브를 위한 스프링 프레임워크의 초기 해법
    1. 관찰자(Observer) 패턴
    2. 관찰자 패턴 사용 예
    3. @EventListener를 사용한 발행 - 구독 패턴
    4. @EventListener 활용한 응용 프로그램 개발
  2. 리액티브 프레임워크 RxJava
    1. 관찰자 + 반복자 = 리액티브 스트림
    2. 스트림의 생산과 소비
    3. 비동기 시퀀스 생성하기
    4. 스트림 변환과 마블 다이어그램
    5. RxJava 사용의 전제 조건 및 이점
    6. RxJava를 이용해 애플리케이션 다시 만들기
  3. 리액티브 라이브러리의 간략한 역사
  4. 리액티브의 전망
  5. 요약

 

2장은 위와 같은 순서대로 작성되어있다.

 

장별 소개

<2장 스프링을 이용한 리액티브 프로그래밍 - 기본 개념> 은 코드 예제를 통해 리액티브 프로그래밍의 잠재력과 그 중심 개념을 더 확장해 설명합니다.
이 장에서는 코드 예제를 사용해 스프링 프레임워크에서 리액티브, 비동기적, 논블로킹 프로그래밍이 가진 잠재력을 보여주며, 비즈니스에 적용한 사례를 보여줍니다.
코드 예제를 통해 게시자 - 구독자 모델의 기초를 배우고, 리액티브 Flow 이벤트의 힘을 이해하고, 실제 시나리오에서 이러한 기술을 어떻게 적용하는지에 대해 배웁니다.

 

이에 더해, 2장에서는 아래와 같은 내용을 다룬다고 한다.

 

  • 관찰자 패턴
  • 스프링 서버에서 보낸 이벤트를 구현한 발행 - 구독(Publish - Subscribe) 구현
  • RxJava의 역사 및 기본 개념
  • 마블(Marble) 다이어그램
  • 리액티브 프로그래밍을 적용한 비즈니스 사례
  • 리액티브 라이브러리의 현재 상황

 

 

 

리액티브를 위한 스프링 프레임워크의 초기 해법

- 1장에서는 자바 Future 인터페이스를 확장하고 HTTP 요청과 같은 작업의 비동기 실행에 활용할 수 있는 스프링 4.x의 ListenableFuture 클래스를 소개했음.

- 소스의 스프링 4.x 컴포넌트만이 비동기 실행을 위한 자바 8의 새로운 CompletableFuture 클래스를 지원함.

 

 

관찰자(Observer) 패턴

관찰자 패턴 : GoF 디자인 패턴 중 하나. 이를 약간만 수정하면 리액티브 프로그래밍의 기초.

  - 이벤트를 발생시키는 역할인 주체, 이벤트를 수신하는 역할인 객체(관찰자) 라는 두 가지 핵심 요소가 존재.

  - MVC 패턴의 중요한 부분.

# 그림 2.1 첨부

 

일반적인 관찰자 패턴은 Subject 와 Observer 2개의 인터페이스로 구성됨.

(대략적인 인터페이스 구성이어서, 느낌만 알고 넘어가면 될 것 같다.)

 

public interface Subject<T> {
        void registerObserver(Observer<T> observer);    // 주체가 해당 옵저버를 구독한다.

        void unregisterObserver(Observer<T> observer);  // 주체가 해당 옵저버를 구독 취소한다.

        void notifyObservers(T event);                  // 구독한 옵저버들을 호출한다. (이벤트 전송 등)
    }

-> 주체

 

public interface Observer<T> {
    void observe(T event);  // 받은 호출을 처리한다. (구독한 주체에게 결과 전달)
}

-> 옵저버

 

중요한 것은, 옵저버는 주체의 존재를 알 수 없다.

 

 

이 때, 예제에서 CopyOnWriteArraySet 을 사용한다..! (Java)

CopyOnWriteArraySet

- Thread-Safe 하기 위해 사용하며, 업데이트 작업이 발생할 때마다 새 복사본을 생성하는 Set 구현체.

 

 

- 대기 시간이 상당히 긴 이벤트를 처리하는 관찰자가 많을 경우

  -> notifyObservers() 호출 시 옵저버에서 결과를 전달해주는데, 이러한 전달을 받기 위해 오랫동안 기다려야하는 경우에 해당. Blocking이 되면 아무것도 못하고 그저 기다려야하니까.

 

  1. 추가적인 쓰레드 할당
  2. 쓰레드 풀을 사용해 메세지 병렬로 전달
  3. 리액티브 프로그래밍으로 Event-Driven

 

** 이 때, Java에서는 스레드 하나당 대략 1MB 를 소비함.

 

 

Java 패키지 내 Observer / Observable 클래스

 

  • 자바 제네릭 이전에 도입됐기 때문에 Object 타입을 사용 -> 타입 안전성이 보장되지 않는다.
  • 멀티 스레드 환경에서 효율적이지 않음.
  • 자바 9에서 더 이상 사용되지 않는다

 

-> 즉, 책에서는 예시까지 작성해서 말했지만 실제로는 이 클래스를 사용하면 안된다.

-> 어차피, RxJava나 다른 리액티브 라이브러리를 사용하는 것이 목표이고, 관찰자 패턴의 원리를 설명하기 위해 예시를 들었음.

** 직접 관찰자 패턴을 구현할 순 있지만, 믿을 수 있는 조직에서 제공하는 구현체를 사용하는 것이 좋다. (오류 처리, 비동기 실행, 스레드 안전성, 성능 요구 사항 등 고려할 부분이 많기 때문에)

 

 

@EventListener를 사용한 발행 - 구독 패턴

발행 - 구독 패턴

@EventListener 와 ApplicationEventPublisher는 발행 - 구독 패턴.

 

 

발행 - 구독 패턴의 특징

- 구독자는 알림을 브로드캐스트하는 이벤트 채널을 알고 있지만, 일반적으로 게시자가 누구인지는 신경 쓰지 않는다.

- 각 이벤트 채널에는 동시에 몇 명의 게시자가 있을 수 있다.

 

# 그림 2.3 

즉, 관찰자 패턴과는 다르게 발행 - 구독 패턴에서는 게시자와 구독자는 서로를 알 필요가 없다.

- 이벤트 채널(메세지 브로커 또는 이벤트 버스라고도 함) : 수신 메세지함 또는 통로

 

 

@EventListener 활용한 응용 프로그램 개발

웹소켓(WebSocket)SSE(Server-Sent Events)

 

SSE : 클라이언트가 서버에서 자동으로 업데이트를 수신할 수 있음.

-> 일반적으로 SSE는 브라우저에 메세지를 업데이트하거나 연속적인 데이터 스트림을 보내는 데 사용.

 

 

 

웹소켓 과 SSE(Server-Sent-Event) 차이점 알아보고 사용해보기

최근에 어떤 이벤트가 생겼을 때 client side에 ui를 업데이트해야 되는 기능을 구현해야 됐었습니다. 처음에는 이런 경우에 사용할 수 있는 것이 socket 밖에 몰라서 socket.io를 사용해서 socket으로 만

surviveasdev.tistory.com

이 둘의 차이를 설명한 좋은 블로그 포스팅이 있으므로 설명은 이로 대체!

 

 

중간에 예제 코드가 나오는데, 공부하면 좋을 부분만 짚고 나머지는 통과하겠습니다.

 

@EnableAsync                                                                        // (1)
@SpringBootApplication
public class DemoApplication implements AsyncConfigurer {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public Executor getAsyncExecutor() {                                            // (2)
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();             // (3)
        executor.setCorePoolSize(2);                                                // (4)
        executor.setMaxPoolSize(100);                                               // (5)
        executor.setQueueCapacity(5);                                               // (6)
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

여기 예제에서는 쓰레드 풀을 활용하여 비동기 처리를 합니다.

코드 중 알아두면 좋겠다는 라인에 주석으로 번호를 매겨봤습니다. (저도 알아두면 좋아서요!)

 

(1) : @EnableAsync 어노테이션에 의해 비동기 실행이 가능해짐.

(2) : Default 쓰레드 풀을 설정하고 Configuration Bean으로 등록함.

  - 사용 시에 @Async 어노테이션을 사용하고자 하는 위치의 함수 위에 붙임.

  - 만약 여러 쓰레드 풀을 만들어두고 싶다면, @Bean("name") 으로 이름을 지정한 후 사용할 때 @Async("name") 으로 지정 가능

(3) 쓰레드 풀 클래스 객체 생성

(4) corePoolSize : 기본적으로 쓰레드 풀에 담겨있는 쓰레드 개수. 동시에 실행시킬 쓰레드 개수와 같다고 보면 됨.

(5) maxPoolSize : 쓰레드 풀의 최대 사이즈

(6) queueCapacity : 쓰레드 풀 큐의 최대 사이즈. 만약, corePoolSize 개수를 넘는 Task 요청이 들어오면 queueCapacity 만큼 해당 큐에 쌓이게 된다.

 

 

그럼 만약, 위와 같이 쓰레드 풀을 구성했다면 실제 사례를 들어보자.

 

 

- corePoolSize가 2 이기 때문에, 현재 쓰레드풀에 있는 쓰레드는 2개 뿐이다.

- 2개의 Task 까지는 처리할 수 있지만, 3개 이상인 경우에는 쓰레드 풀 큐에 Task가 쌓인다.

- 만약 7개의 Task가 들어온다면? -> queueCapacity가 5이므로 2개는 corePool의 쓰레드가, 5개는 쓰레드 풀 큐에 쌓여있게 된다.

- 만약 8개 이상의 Task가 들어온다면? -> 2개는 corePool의 쓰레드, 5개는 쓰레드 풀 큐에 쌓이고, 그 다음 Task 처리를 위해 쓰레드 풀에 쓰레드를 추가 투입한다.

- 최대 maxPoolSize 까지 쓰레드를 추가 투입할 수 있다.

- 만약 108개 이상의 Task가 들어온다면?

  -> 2개는 corePool의 쓰레드

  -> 5개는 쓰레드 풀 큐

  -> 100개는 추가 생성한 쓰레드

  -> 이래도 1개가 남는다면? TaskRejectedException으로 애플리케이션이 펑!

 

 

@EventListener 및 쓰레드 풀을 활용한 방식의 단점

- 비즈니스 로직을 정의하고 구현하기 위해 스프링 프레임워크의 내부 메커니즘을 사용한다.

  -> 프레임워크의 사소한 변경으로 인해 응용 프로그램의 안정성을 위협할 수 있음. (스프링 프레임워크를 쓰는건 변하지 않지만.. 버전도 변경하기에는 어려우니)

- 스프링 컨텍스트(ApplicationContext)를 로드하지 않고 비즈니스 로직을 단위 테스트 하는 것은 어렵다.

- 수많은 메서드에 @EventListener 어노테이션만 붙여 있고, 전체 WorkFlow를 설명하는 한 줄의 명시적 스크립트도 없는 응용 프로그램이 될 수 있다.

  -> 가독성 저하

 

즉, 진정한 비동기적 리액티브 접근(프레임워크) 에서는 이러한 쓰레드 풀이 필요 없는 일이다!?

 

 

 

리액티브 프레임워크 RxJava

RxJava 라이브러리는 Reactive Extensions 의 자바 구현체. (ReactiveX라고도 함)

- Reactive Extensions : 동기식 또는 비동기식 스트림과 관계없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구.

- ReactiveX = 관찰자(Observer) 패턴 / 반복자(Iterator) 패턴 / 함수형 프로그래밍의 조합

 

 

관찰자 + 반복자 = 리액티브 스트림

리액티브 스트림의 장점

 

  • 데이터 스트림의 끝을 알리는 기능 존재 -> onComplete() / onError()
  • 컨슈머가 준비하기 전에 프로듀서가 이벤트를 생성하지 않음. -> onNext()

 

public interface RxObserver<T> {
    void onNext(T next);
    void onComplete();
    void onError(Exception e);
}

RxJava의 기본 개념인 Observer 인터페이스

- 리액티브 스트림의 모든 컴포넌트 사이에 데이터가 흐르는 방법 정의

- 어떤 라이브러리에든 가장 조상이 되는 코드는 위 3개의 함수

# 그림 2.6 첨부

 

여기서, onNext()가 여러 개면?

onNext() 하나 당 결과를 callback 받아 활용할 수 있다.

 

 

RxJava 1.2.7부터 Observable을 생성하는 방식은 더 이상 사용되지 않습니다. -> 배압(backpressue) 을 지원하지 않아서.

 

 

 

Stream Backpressure의 이해 | Doublem.org

Stream Backpressure의 이해

doublem.org

배압에 대한 설명과 이해는 이 포스팅이 좋을 것 같아서 링크남깁니다!

 

 

예외 처리를 정의하지 않는 것이 편리하기는 하지만, 기본 Subscriber 구현체는 오류 발생 시 rx.exceptions.OnErrorNotImplementedException 을 발생시킵니다.

 

 

 

2부에서 계속됩니다.

Comments