스프링/WEB-FLUX

스프링부트 Web-Flux 02 - 라이브러리 써보기

H-V 2021. 10. 25. 17:51

유투버 '데어 프로그래밍'님 강의 참조 

 

 

 

01 기초 세팅

  • 프로젝트 생성

  • 메이븐 추가
<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.3</version>
</dependency>
  • 이 라이브러리중 가장 중요한 3개의 클래스(하이라이트)를 구동할 세팅이 필요 하다. 

 

  • 기본 세팅
public class App {
	public static void main(String[] args) {
		MyPub pub = new MyPub(); //신문사 생성
		MySub sub = new MySub(); //구독자 생성
		pub.subscribe(sub);
	}
}
public class MyPub implements Publisher<Integer>{

	public void subscribe(Subscriber<? super Integer> s) {
		System.out.println("구독자: 신문사야 나 너희 신문 볼께");
		System.out.println("신문사: 구독자 정보를 만들어서 줄테니 기다려!!");
	}
}
public class MySub implements Subscriber<Integer>{
	
	private Subscription s;
	
	public void onSubscribe(Subscription s) {
		System.out.println("구독자: 생성된 구독자 정보 받음");
		this.s = s;
		System.out.println("구독자: 이제 신문 1개씩 주세요");
		s.request(1);
	}

	public void onNext(Integer t) {
		System.out.println("전달 데이터 onNex(): "+t);
	}

	public void onError(Throwable t) {
		System.out.println("구독중 에러");
	}

	public void onComplete() {
		System.out.println("구독 완료");
	}
}

 

여기까지 세팅의 흐름을 보면 

  1. 신문사/구독자/이 두개를 연결해서 실행하는 어플리케이션으로 구성
  2. App.java 에서 신문사/구독자 생성 후 구독 신문사 구독
  3. MyPub.java에서 신문사 구독 요청을 받으면 's'에 구독자 정보를 담아서 넘겨 줌

 

  • 독자 정보를 받기 위한 MySubscription 세팅
//구독 정보(구독자, 어떤 데이터를 구독할지)
public class MySubscription implements Subscription {

	public void request(long n) {
		
	}

	public void cancel() {
		
	}
}​

 

 

  • MySubscription에 넘어오는 데이터는 MyPub에서 정의
    public class MyPub implements Publisher<Integer>{
    	
    	Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    
    	public void subscribe(Subscriber<? super Integer> s) {
    		System.out.println("구독자: 신문사야 나 너희 신문 볼께");
    		System.out.println("신문사: 구독자 정보를 만들어서 줄테니 기다려!!");
    		MySubscription subscription = new MySubscription(s, its);
    	}
    }​
    구독자와 데이터가 함께 넘어간다 (s/its)
  • MySubscription에서 s/its 를 받아 재 세팅 
//구독 정보(구독자, 어떤 데이터를 구독할지)
public class MySubscription implements Subscription {

	private Subscriber s;
	private Iterator<Integer> it;
	
	public MySubscription(Subscriber s, Iterable<Integer> its) {
		this.s = s;
		this.it = its.iterator();
	}

	public void request(long n) {
		
	}

	public void cancel() {
		
	}
}

 

  • MyPub에서 구독 정보가 완성되면 표시되도록 코드 수정
    public class MyPub implements Publisher<Integer>{
    	
    	Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    
    	public void subscribe(Subscriber<? super Integer> s) {
    		System.out.println("구독자: 신문사야 나 너희 신문 볼께");
    		System.out.println("신문사: 구독자 정보를 만들어서 줄테니 기다려!!");
    		MySubscription subscription = new MySubscription(s, its);
    		System.out.println("신문사: 구독자 정보 생성 완료");
    		s.onSubscribe(subscription);
    	}
    }​

 

→ 1차 테스트

s가 Subscriber(구독자)로 s에게 'subscription(구독정보)'를 넘겨 주게 된다. 

 

 

 

 

02 넘겨 받은 데이터 사용

 

  • 넘어온 구독정보를 이제 구독자가 받아 보자
  • 넘어온 구독정보 데이터를 잘 받았으니 이 데이터의 request를 요청 해야 한다.
  • 구독정보의 request가 요청되면 MySubscription의 request 함수가 발동 된다. 

  1. 현재 s.request(1)을 요청 했으므로 n = 1 이다
  2. while의 true가 성립되고 if문을 타게 된다. 이때 it은 'MyPub'에 선언된 its로 1~10 중 1을 가져 온다. 

 

→ 실행을 해보면....

→ s.request()의 숫자를 바꾸면 s.onNext()를 통해 데이터가 다음과 같이 계속해서 전달 된다. 

이렇게 요청에 대한 응답을 계속해서 주는것을 flux라고 한다

 

 

  • s.request()가 백프레셔가 되는데 받는 사람이 처리할 데이터양을 지정 가능하게 한다. 
  • 현재까지는 데이터가 10개인데 하나씩만 요청을 하니 하나를 주고 나머지 9개는 응답 대기 상태이다. 이 응답 또한 소비를 하려면 onNext()에서 s.request()를 추가하면 된다. 

 

※ 계속해서 응답하는 데이터양을 1개 말고 2개 이상으로 늘리면 onNext() 함수가 응답 요청 수에 맞게 반복적으로 불러진다. 그래서 직접 코드가 안꼬이도록 해줘야 한다. 

 

 

 

 

  • 여기에 세팅된 라이브러리 및 클래스를 제일 처음한 'fluxtext' 프로젝트의 MyFilter for문에 적용 시키면 웹화면에 요청 회수만큼 뿌려지게 되는 것. 
  • 다시 상기하면 WebFlux는 단일스레드, 비동기 + Stream을 통해 백프레셔가 적용된 데이터 만큼 간헐적 응답이 가능. 데이터 소비가 끝나면 응답이 종료 되지만 SSE를 적용하여 데이터 소비가 끝나도 Stream을 계속 유지 할 수 있음