- Today
- Yesterday
- Total
메이쁘
[IT갈피] 실전! 스프링5를 활용한 리액티브 프로그래밍 - 2장 : 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (1/2) 본문
[IT갈피] 실전! 스프링5를 활용한 리액티브 프로그래밍 - 2장 : 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (1/2)
메이쁘 2022. 11. 24. 20:52안녕하세요.
요즈음 개인적으로 시간내서 IT공부를 하고 있지 않았고,
최근들어 다시 공부의 필요성을 느꼈습니다.
회사의 도움으로 교재를 구매하여 따로 공부하며 잊지않고 기록하기 위해 갈피를 잡아보려합니다.
비단 교재 뿐 아니라 IT와 관련된 내용이라면 여기 IT갈피 에 잡으려고요.
저의 IT갈피를 보러 와주신 분들께 감사인사 드리면서,
시작하겠습니다!
실전! 스프링 5를 활용한 리액티브 프로그래밍
올레 도쿠카,이호르 로진스키 지음 | 김시영 옮김
https://search.shopping.naver.com/book/catalog/32482369871
2장 - 스프링을 이용한 리액티브 프로그래밍 - 기본 개념
- 리액티브를 위한 스프링 프레임워크의 초기 해법
- 관찰자(Observer) 패턴
- 관찰자 패턴 사용 예
- @EventListener를 사용한 발행 - 구독 패턴
- @EventListener 활용한 응용 프로그램 개발
- 리액티브 프레임워크 RxJava
- 관찰자 + 반복자 = 리액티브 스트림
- 스트림의 생산과 소비
- 비동기 시퀀스 생성하기
- 스트림 변환과 마블 다이어그램
- RxJava 사용의 전제 조건 및 이점
- RxJava를 이용해 애플리케이션 다시 만들기
- 리액티브 라이브러리의 간략한 역사
- 리액티브의 전망
- 요약
2장은 위와 같은 순서대로 작성되어있다.
장별 소개
<2장 스프링을 이용한 리액티브 프로그래밍 - 기본 개념> 은 코드 예제를 통해 리액티브 프로그래밍의 잠재력과 그 중심 개념을 더 확장해 설명합니다.
이 장에서는 코드 예제를 사용해 스프링 프레임워크에서 리액티브, 비동기적, 논블로킹 프로그래밍이 가진 잠재력을 보여주며, 비즈니스에 적용한 사례를 보여줍니다.
코드 예제를 통해 게시자 - 구독자 모델의 기초를 배우고, 리액티브 Flow 이벤트의 힘을 이해하고, 실제 시나리오에서 이러한 기술을 어떻게 적용하는지에 대해 배웁니다.
이에 더해, 2장에서는 아래와 같은 내용을 다룬다고 한다.
- 관찰자 패턴
- 스프링 서버에서 보낸 이벤트를 구현한 발행 - 구독(Publish - Subscribe) 구현
- RxJava의 역사 및 기본 개념
- 마블(Marble) 다이어그램
- 리액티브 프로그래밍을 적용한 비즈니스 사례
- 리액티브 라이브러리의 현재 상황
리액티브를 위한 스프링 프레임워크의 초기 해법
- 1장에서는 자바 Future 인터페이스를 확장하고 HTTP 요청과 같은 작업의 비동기 실행에 활용할 수 있는 스프링 4.x의 ListenableFuture 클래스를 소개했음.
- 소스의 스프링 4.x 컴포넌트만이 비동기 실행을 위한 자바 8의 새로운 CompletableFuture 클래스를 지원함.
관찰자(Observer) 패턴
관찰자 패턴 : GoF 디자인 패턴 중 하나. 이를 약간만 수정하면 리액티브 프로그래밍의 기초.
- 이벤트를 발생시키는 역할인 주체, 이벤트를 수신하는 역할인 객체(관찰자) 라는 두 가지 핵심 요소가 존재.
- MVC 패턴의 중요한 부분.
# 그림 2.1 첨부
일반적인 관찰자 패턴은 Subject 와 Observer 2개의 인터페이스로 구성됨.
(대략적인 인터페이스 구성이어서, 느낌만 알고 넘어가면 될 것 같다.)
public interface Subject<T> {
void registerObserver(Observer<T> observer); // 주체가 해당 옵저버를 구독한다.
void unregisterObserver(Observer<T> observer); // 주체가 해당 옵저버를 구독 취소한다.
void notifyObservers(T event); // 구독한 옵저버들을 호출한다. (이벤트 전송 등)
}
-> 주체
public interface Observer<T> {
void observe(T event); // 받은 호출을 처리한다. (구독한 주체에게 결과 전달)
}
-> 옵저버
중요한 것은, 옵저버는 주체의 존재를 알 수 없다.
이 때, 예제에서 CopyOnWriteArraySet 을 사용한다..! (Java)
CopyOnWriteArraySet
- Thread-Safe 하기 위해 사용하며, 업데이트 작업이 발생할 때마다 새 복사본을 생성하는 Set 구현체.
- 대기 시간이 상당히 긴 이벤트를 처리하는 관찰자가 많을 경우
-> notifyObservers() 호출 시 옵저버에서 결과를 전달해주는데, 이러한 전달을 받기 위해 오랫동안 기다려야하는 경우에 해당. Blocking이 되면 아무것도 못하고 그저 기다려야하니까.
- 추가적인 쓰레드 할당
- 쓰레드 풀을 사용해 메세지 병렬로 전달
- 리액티브 프로그래밍으로 Event-Driven
** 이 때, Java에서는 스레드 하나당 대략 1MB 를 소비함.
Java 패키지 내 Observer / Observable 클래스
- 자바 제네릭 이전에 도입됐기 때문에 Object 타입을 사용 -> 타입 안전성이 보장되지 않는다.
- 멀티 스레드 환경에서 효율적이지 않음.
- 자바 9에서 더 이상 사용되지 않는다
-> 즉, 책에서는 예시까지 작성해서 말했지만 실제로는 이 클래스를 사용하면 안된다.
-> 어차피, RxJava나 다른 리액티브 라이브러리를 사용하는 것이 목표이고, 관찰자 패턴의 원리를 설명하기 위해 예시를 들었음.
** 직접 관찰자 패턴을 구현할 순 있지만, 믿을 수 있는 조직에서 제공하는 구현체를 사용하는 것이 좋다. (오류 처리, 비동기 실행, 스레드 안전성, 성능 요구 사항 등 고려할 부분이 많기 때문에)
@EventListener를 사용한 발행 - 구독 패턴
@EventListener 와 ApplicationEventPublisher는 발행 - 구독 패턴.
발행 - 구독 패턴의 특징
- 구독자는 알림을 브로드캐스트하는 이벤트 채널을 알고 있지만, 일반적으로 게시자가 누구인지는 신경 쓰지 않는다.
- 각 이벤트 채널에는 동시에 몇 명의 게시자가 있을 수 있다.
# 그림 2.3
즉, 관찰자 패턴과는 다르게 발행 - 구독 패턴에서는 게시자와 구독자는 서로를 알 필요가 없다.
- 이벤트 채널(메세지 브로커 또는 이벤트 버스라고도 함) : 수신 메세지함 또는 통로
@EventListener 활용한 응용 프로그램 개발
웹소켓(WebSocket) 과 SSE(Server-Sent Events)
SSE : 클라이언트가 서버에서 자동으로 업데이트를 수신할 수 있음.
-> 일반적으로 SSE는 브라우저에 메세지를 업데이트하거나 연속적인 데이터 스트림을 보내는 데 사용.
이 둘의 차이를 설명한 좋은 블로그 포스팅이 있으므로 설명은 이로 대체!
중간에 예제 코드가 나오는데, 공부하면 좋을 부분만 짚고 나머지는 통과하겠습니다.
@EnableAsync // (1)
@SpringBootApplication
public class DemoApplication implements AsyncConfigurer {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public Executor getAsyncExecutor() { // (2)
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // (3)
executor.setCorePoolSize(2); // (4)
executor.setMaxPoolSize(100); // (5)
executor.setQueueCapacity(5); // (6)
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
여기 예제에서는 쓰레드 풀을 활용하여 비동기 처리를 합니다.
코드 중 알아두면 좋겠다는 라인에 주석으로 번호를 매겨봤습니다. (저도 알아두면 좋아서요!)
(1) : @EnableAsync 어노테이션에 의해 비동기 실행이 가능해짐.
(2) : Default 쓰레드 풀을 설정하고 Configuration Bean으로 등록함.
- 사용 시에 @Async 어노테이션을 사용하고자 하는 위치의 함수 위에 붙임.
- 만약 여러 쓰레드 풀을 만들어두고 싶다면, @Bean("name") 으로 이름을 지정한 후 사용할 때 @Async("name") 으로 지정 가능
(3) 쓰레드 풀 클래스 객체 생성
(4) corePoolSize : 기본적으로 쓰레드 풀에 담겨있는 쓰레드 개수. 동시에 실행시킬 쓰레드 개수와 같다고 보면 됨.
(5) maxPoolSize : 쓰레드 풀의 최대 사이즈
(6) queueCapacity : 쓰레드 풀 큐의 최대 사이즈. 만약, corePoolSize 개수를 넘는 Task 요청이 들어오면 queueCapacity 만큼 해당 큐에 쌓이게 된다.
그럼 만약, 위와 같이 쓰레드 풀을 구성했다면 실제 사례를 들어보자.
- corePoolSize가 2 이기 때문에, 현재 쓰레드풀에 있는 쓰레드는 2개 뿐이다.
- 2개의 Task 까지는 처리할 수 있지만, 3개 이상인 경우에는 쓰레드 풀 큐에 Task가 쌓인다.
- 만약 7개의 Task가 들어온다면? -> queueCapacity가 5이므로 2개는 corePool의 쓰레드가, 5개는 쓰레드 풀 큐에 쌓여있게 된다.
- 만약 8개 이상의 Task가 들어온다면? -> 2개는 corePool의 쓰레드, 5개는 쓰레드 풀 큐에 쌓이고, 그 다음 Task 처리를 위해 쓰레드 풀에 쓰레드를 추가 투입한다.
- 최대 maxPoolSize 까지 쓰레드를 추가 투입할 수 있다.
- 만약 108개 이상의 Task가 들어온다면?
-> 2개는 corePool의 쓰레드
-> 5개는 쓰레드 풀 큐
-> 100개는 추가 생성한 쓰레드
-> 이래도 1개가 남는다면? TaskRejectedException으로 애플리케이션이 펑!
@EventListener 및 쓰레드 풀을 활용한 방식의 단점
- 비즈니스 로직을 정의하고 구현하기 위해 스프링 프레임워크의 내부 메커니즘을 사용한다.
-> 프레임워크의 사소한 변경으로 인해 응용 프로그램의 안정성을 위협할 수 있음. (스프링 프레임워크를 쓰는건 변하지 않지만.. 버전도 변경하기에는 어려우니)
- 스프링 컨텍스트(ApplicationContext)를 로드하지 않고 비즈니스 로직을 단위 테스트 하는 것은 어렵다.
- 수많은 메서드에 @EventListener 어노테이션만 붙여 있고, 전체 WorkFlow를 설명하는 한 줄의 명시적 스크립트도 없는 응용 프로그램이 될 수 있다.
-> 가독성 저하
즉, 진정한 비동기적 리액티브 접근(프레임워크) 에서는 이러한 쓰레드 풀이 필요 없는 일이다!?
리액티브 프레임워크 RxJava
RxJava 라이브러리는 Reactive Extensions 의 자바 구현체. (ReactiveX라고도 함)
- Reactive Extensions : 동기식 또는 비동기식 스트림과 관계없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구.
- ReactiveX = 관찰자(Observer) 패턴 / 반복자(Iterator) 패턴 / 함수형 프로그래밍의 조합
관찰자 + 반복자 = 리액티브 스트림
리액티브 스트림의 장점
- 데이터 스트림의 끝을 알리는 기능 존재 -> onComplete() / onError()
- 컨슈머가 준비하기 전에 프로듀서가 이벤트를 생성하지 않음. -> onNext()
public interface RxObserver<T> {
void onNext(T next);
void onComplete();
void onError(Exception e);
}
RxJava의 기본 개념인 Observer 인터페이스
- 리액티브 스트림의 모든 컴포넌트 사이에 데이터가 흐르는 방법 정의
- 어떤 라이브러리에든 가장 조상이 되는 코드는 위 3개의 함수
# 그림 2.6 첨부
여기서, onNext()가 여러 개면?
onNext() 하나 당 결과를 callback 받아 활용할 수 있다.
RxJava 1.2.7부터 Observable을 생성하는 방식은 더 이상 사용되지 않습니다. -> 배압(backpressue) 을 지원하지 않아서.
배압에 대한 설명과 이해는 이 포스팅이 좋을 것 같아서 링크남깁니다!
예외 처리를 정의하지 않는 것이 편리하기는 하지만, 기본 Subscriber 구현체는 오류 발생 시 rx.exceptions.OnErrorNotImplementedException 을 발생시킵니다.
2부에서 계속됩니다.
'나의 갈피 > IT갈피' 카테고리의 다른 글
[IT갈피] 실전! 스프링5를 활용한 리액티브 프로그래밍 - 1장 : 왜 리액티브 스프링인가? (0) | 2022.11.20 |
---|---|
[IT갈피] 마이크로서비스 도입, 이렇게 한다 - 1장 : 더도 덜도 아닌 딱 마이크로서비스 (0) | 2022.10.13 |