스프링/WEB-FLUX
스프링부트 Web-Flux 02 - 라이브러리 써보기
H-V
2021. 10. 25. 17:51
유투버 '데어 프로그래밍'님 강의 참조
01 기초 세팅
- 프로젝트 생성
- 메이븐 추가
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
- 이 라이브러리중 가장 중요한 3개의 클래스(하이라이트)를 구동할 세팅이 필요 하다.
- 기본 세팅
public class App {
public static void main(String[] args) {
MyPub pub = new MyPub(); //신문사 생성
MySub sub = new MySub(); //구독자 생성
pub.subscribe(sub);
}
}
public class MyPub implements Publisher<Integer>{
public void subscribe(Subscriber<? super Integer> s) {
System.out.println("구독자: 신문사야 나 너희 신문 볼께");
System.out.println("신문사: 구독자 정보를 만들어서 줄테니 기다려!!");
}
}
public class MySub implements Subscriber<Integer>{
private Subscription s;
public void onSubscribe(Subscription s) {
System.out.println("구독자: 생성된 구독자 정보 받음");
this.s = s;
System.out.println("구독자: 이제 신문 1개씩 주세요");
s.request(1);
}
public void onNext(Integer t) {
System.out.println("전달 데이터 onNex(): "+t);
}
public void onError(Throwable t) {
System.out.println("구독중 에러");
}
public void onComplete() {
System.out.println("구독 완료");
}
}
여기까지 세팅의 흐름을 보면
- 신문사/구독자/이 두개를 연결해서 실행하는 어플리케이션으로 구성
- App.java 에서 신문사/구독자 생성 후 구독 신문사 구독
- MyPub.java에서 신문사 구독 요청을 받으면 's'에 구독자 정보를 담아서 넘겨 줌
- 독자 정보를 받기 위한 MySubscription 세팅
//구독 정보(구독자, 어떤 데이터를 구독할지)
public class MySubscription implements Subscription {
public void request(long n) {
}
public void cancel() {
}
}
- MySubscription에 넘어오는 데이터는 MyPub에서 정의
구독자와 데이터가 함께 넘어간다 (s/its)public class MyPub implements Publisher<Integer>{ Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10); public void subscribe(Subscriber<? super Integer> s) { System.out.println("구독자: 신문사야 나 너희 신문 볼께"); System.out.println("신문사: 구독자 정보를 만들어서 줄테니 기다려!!"); MySubscription subscription = new MySubscription(s, its); } }
- MySubscription에서 s/its 를 받아 재 세팅
//구독 정보(구독자, 어떤 데이터를 구독할지)
public class MySubscription implements Subscription {
private Subscriber s;
private Iterator<Integer> it;
public MySubscription(Subscriber s, Iterable<Integer> its) {
this.s = s;
this.it = its.iterator();
}
public void request(long n) {
}
public void cancel() {
}
}
- MyPub에서 구독 정보가 완성되면 표시되도록 코드 수정
public class MyPub implements Publisher<Integer>{ Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10); public void subscribe(Subscriber<? super Integer> s) { System.out.println("구독자: 신문사야 나 너희 신문 볼께"); System.out.println("신문사: 구독자 정보를 만들어서 줄테니 기다려!!"); MySubscription subscription = new MySubscription(s, its); System.out.println("신문사: 구독자 정보 생성 완료"); s.onSubscribe(subscription); } }
→ 1차 테스트
02 넘겨 받은 데이터 사용
- 넘어온 구독정보를 이제 구독자가 받아 보자
- 넘어온 구독정보 데이터를 잘 받았으니 이 데이터의 request를 요청 해야 한다.
- 구독정보의 request가 요청되면 MySubscription의 request 함수가 발동 된다.
- 현재 s.request(1)을 요청 했으므로 n = 1 이다
- while의 true가 성립되고 if문을 타게 된다. 이때 it은 'MyPub'에 선언된 its로 1~10 중 1을 가져 온다.
→ 실행을 해보면....
→ s.request()의 숫자를 바꾸면 s.onNext()를 통해 데이터가 다음과 같이 계속해서 전달 된다.
- s.request()가 백프레셔가 되는데 받는 사람이 처리할 데이터양을 지정 가능하게 한다.
- 현재까지는 데이터가 10개인데 하나씩만 요청을 하니 하나를 주고 나머지 9개는 응답 대기 상태이다. 이 응답 또한 소비를 하려면 onNext()에서 s.request()를 추가하면 된다.
※ 계속해서 응답하는 데이터양을 1개 말고 2개 이상으로 늘리면 onNext() 함수가 응답 요청 수에 맞게 반복적으로 불러진다. 그래서 직접 코드가 안꼬이도록 해줘야 한다.
- 여기에 세팅된 라이브러리 및 클래스를 제일 처음한 'fluxtext' 프로젝트의 MyFilter for문에 적용 시키면 웹화면에 요청 회수만큼 뿌려지게 되는 것.
- 다시 상기하면 WebFlux는 단일스레드, 비동기 + Stream을 통해 백프레셔가 적용된 데이터 만큼 간헐적 응답이 가능. 데이터 소비가 끝나면 응답이 종료 되지만 SSE를 적용하여 데이터 소비가 끝나도 Stream을 계속 유지 할 수 있음