흔히들 글이나 그림으로 설명을 하기만 한다.
나는 그런 것보다 직접 눈으로 봐야 직성이 풀린다(기능 작성 or 테스트 코드)
자 그럼 WebClient와 함께 테스트를 해보겠다
Spring은 기본적으로 Blocking/Sync 방식으로 동작을 하고 Node.js가 Non-blocking/Async 방식으로 동작을 한다
비동기 방식은 순차처리를 지원하지않는다
순서가 중요한 처리는 동기방식으로 처리를 해야한다(또한 잠금이 필요한 경우)
비동기관련용어 - ajax, XMLHttpRequest, Callback, Promise...
Blocking / Non-blocking 은 호출된 함수가 호출한 함수에게 제어권을 바로 주느냐 안주느냐,
Sync / Async 는 호출된 함수의 종료를 호출한 함수가 처리하느냐, 호출된 함수가 처리하느냐의 차이다.
블로킹 Blocking
- A 함수가 B 함수를 호출 할 때, B 함수가 자신의 작업이 종료되기 전까지 A 함수에게 제어권을 돌려주지 않는 것
논블로킹 Non-blocking
- A 함수가 B 함수를 호출 할 때, B 함수가 제어권을 바로 A 함수에게 넘겨주면서, A 함수가 다른 일을 할 수 있도록 하는 것.
동기 Synchronous
- A 함수가 B 함수를 호출 할 때, B 함수의 결과를 A 함수가 처리하는 것.
비동기 Asynchronous
- A 함수가 B 함수를 호출 할 때, B 함수의 결과를 B 함수가 처리하는 것. (callback)
호출하는 쪽: A
import java.util.concurrent.ExecutionException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@RestController
@RequiredArgsConstructor
public class AController {
private final WebClient webClient;
private static final String TEST_API = "localhost:8090";
@GetMapping("/block-sync")
public ResponseEntity<Object> block_sync() {
Mono<String> mono = webClient
.method(HttpMethod.GET)
.uri(TEST_API + "/sync")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(System.out::println);
Object result = mono.block();
System.out.println("block-sync End");
return ResponseEntity.ok(result);
}
@GetMapping("/block-async")
public ResponseEntity<Object> block_async() throws ExecutionException, InterruptedException {
Mono<String> mono = webClient
.method(HttpMethod.GET)
.uri(TEST_API + "/async")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(System.out::println);
// CompletableFuture<String> future = mono.toFuture();
Object result = mono.block();
System.out.println("block-async End");
return ResponseEntity.ok(result);
// return ResponseEntity.ok(future.get());
}
@GetMapping("/nonblock-sync")
public ResponseEntity<Object> nonblock_sync() throws ExecutionException, InterruptedException {
Mono<String> mono = webClient.method(HttpMethod.GET)
.uri(TEST_API + "/sync")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(System.out::println);
// CompletableFuture<String> future = mono.toFuture();
Object result = mono.subscribe();
System.out.println("nonblock-sync End");
return ResponseEntity.ok(result);
// return ResponseEntity.ok(future.get());
}
@GetMapping("/nonblock-async")
public ResponseEntity<Object> nonblock_async() {
Mono<String> mono = webClient.method(HttpMethod.GET)
.uri(TEST_API + "/async")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(System.out::println);
Object result = mono.subscribe();
System.out.println("nonblock-async End");
return ResponseEntity.ok(result);
}
@GetMapping("/test")
public ResponseEntity<Object> test_data(@RequestParam Object data) {
Flux<Object> flux = webClient.method(HttpMethod.GET)
.uri(TEST_API + "/test/{data}", data)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(Object.class);
int[] under50 = flux.toStream().mapToInt(o -> Integer.parseInt(o.toString()))
.filter(value -> value <= 50).toArray();
System.out.println("test End");
return ResponseEntity.ok(under50);
}
}
호출 당하는 쪽: B
// Controller
import com.example.testproject.service.TestService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
@RequiredArgsConstructor
@RestController
public class BController {
private final TestService testService;
@GetMapping("/test")
public String test() {
System.out.println("안녕하세요");
return "hello";
}
@GetMapping("/sync")
public ResponseEntity<Object> sync() throws InterruptedException {
String result1 = testService.sync1();
String result2 = testService.sync2();
String result3 = testService.sync3();
String answer = String.join(",", result1, result2, result3);
System.out.println("sync called");
return ResponseEntity.status(HttpStatus.OK).body(answer);
}
@GetMapping("/async")
public ResponseEntity<Object> async() throws InterruptedException, ExecutionException {
String result1 = testService.async1();
String result2 = testService.async2().get();
String result3 = testService.async3();
String answer = String.join(",", result1, result2, result3);
System.out.println("async called");
return ResponseEntity.status(HttpStatus.OK).body(answer);
}
@GetMapping("/test/{data}")
public ResponseEntity<Object> wc(@PathVariable Integer data) {
ArrayList<Object> list = new ArrayList<>();
for (int i = 0; i < data; i++) {
list.add(i);
}
return ResponseEntity.ok(list);
}
}
결과값들을 받아서 ,를 구분자로 합치고 출력하고 반환하는 컨트롤러이다.
// Service
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.*;
@Service
public class TestService {
public String sync1() throws InterruptedException {
Thread.sleep(3000);
System.out.println("sync1");
return "sync1";
}
public String sync2() throws InterruptedException {
Thread.sleep(3000);
System.out.println("sync2");
return "sync2";
}
public String sync3() throws InterruptedException {
Thread.sleep(3000);
System.out.println("sync3");
return "sync3";
}
@Async
public String async1() throws InterruptedException {
Thread.sleep(3000);
System.out.println("async1");
return "async1";
}
@Async
public CompletableFuture<String> async2() throws InterruptedException, ExecutionException {
ExecutorService executor
= Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(3000);
System.out.println("async2");
return "async2";
});
return CompletableFuture.completedFuture(future.get());
}
@Async
public String async3() throws InterruptedException {
Thread.sleep(3000);
System.out.println("async3");
return "async3";
}
}
간단하게 3초 쉬고 프린트하고 문자열을 반환하는 서비스이다. 특이하게 async2 메서드는 Future타입을 반환하는 것을 눈여겨서 보자
짧막하게 WebClient에서는 Mono, Flux 타입 2가지가 있는데 Mono는 0~1개의 데이터를 전달, Flux 0~N개의 데이터를 전달한다는 의미이다.
WebClient도 나중에 자세히 포스팅을 하겠다!(이젠 RestTemplate은 보내주go...)
블록킹과 논블록킹은 제어권을 누가 갖느냐의 차이이다.
동기와 비동기는 처리는 성공/실패로 나뉘지만 결과가 언제올지 모르는 것..
보통 블록-동기 / 논블록-비동기 모델로 프로그래밍을 한다고 한다
자 이제 코드가 다 있으니 테스트를 해본다
block으로 Blocking을, subscribe로 Non-blocking이 된다
1. Blocking + Synchronous
// 요청
// 호출 하는 쪽
// 호출 당하는 쪽
// 결과
호출한쪽: 예상 그대로 block이 다 끝나고 난 뒤 밑에 프린트문이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨주지 않음)
호출받은 쪽: sync방식이기때문에 순차적으로 실행이 된다
2. Blocking + Asynchronous
// 요청
// 호출 하는 쪽
// 호출 당하는 쪽
// 결과
호출한쪽: 예상 그대로 block이 다 끝나고 난 뒤 밑에 프린트문이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨주지 않음)
호출받은 쪽: async방식이기때문에 순서와 상관없이 실행된다
결과물은 Future로 설정했었던 async2만 값을 받아오고 나머진 null로 처리가 되었다
3. Non-Blocking + Synchronous
// 요청
// 호출 하는 쪽
// 호출 당하는 쪽
// 결과
호출한쪽: 밑의 프린트문이 출력되고 난 뒤에 결과값이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨줌)
여기서 또 눈여겨 볼 부분은 aop가 중간부분에서 실행됐다는 점이다(!!)
호출받은 쪽: sync방식이기때문에 순차적으로 실행된다
결과물은 subscribe방식으로 불러와서 dispose(분해/처리)가 되었다
[ 사실 주석한 코드처럼 toFutre로 불러오면....!! ]
4. Non-Blocking + Asynchronous
// 요청
// 호출 하는 쪽
// 호출 당하는 쪽
// 결과
호출한쪽: 밑의 프린트문이 출력되고 난 뒤에 결과값이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨줌)
여기서 또 눈여겨 볼 부분은 aop가 중간부분에서 실행됐다는 점이다(!!)
호출받은 쪽: async방식이기때문에 순서와 상관없이 실행된다
결과물은 subscribe방식으로 불러와서 dispose(분해/처리)가 되었다
[ 사실 주석한 코드처럼 toFutre로 불러오면....!! ]
--------------------------------------------------------------------------------------------------------
자 이제 위의 개념들에 대해서 알거라 생각한다.
추가로 코드를 바꿔본다
결과값이 없던 non-block방식의 response만 Future로 바꿔본다
// 3번 수정
@GetMapping("/nonblock-sync")
public ResponseEntity<Object> nonblock_sync() throws ExecutionException, InterruptedException {
Mono<String> mono = webClient.method(HttpMethod.GET)
.uri(TEST_API+"/sync")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(System.out::println);
CompletableFuture<String> future = mono.toFuture();
// Object result = mono.subscribe();
System.out.println("nonblock-sync End");
// return ResponseEntity.ok(result);
return ResponseEntity.ok(future.get());
}
nonblock-sync 요청상태에서
Futre로 뽑는다면 response는 어떻게 될까?
// 요청
// 호출하는 쪽
// 호출 당하는 쪽
// 결과
아까와는 다르게 전부 다 Response로 뽑힌다!
결국 sync방식은 그냥 뽑으나 Future로 뽑으나 그게그거인거같다
이번에는 4번 코드를 수정해보았다
@GetMapping("/nonblock-async")
public ResponseEntity<Object> nonblock_async() throws ExecutionException, InterruptedException {
Mono<String> mono = webClient.method(HttpMethod.GET)
.uri(TEST_API+"/async")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(System.out::println);
CompletableFuture<String> future = mono.toFuture();
// Object result = mono.subscribe();
System.out.println("nonblock-async End");
// return ResponseEntity.ok(result);
return ResponseEntity.ok(future.get());
}
// 요청
// 호출하는 쪽
// 호출 당하는 쪽
// 결과
아까와는 다르게 전부 다 Response로 뽑힌다! 하지만 null이 보인다
결국 async방식은 Future로 된 객체만 response 한다는 결론이다!!
+
추가로 Flux 타입도 사용해보았다
호출하는쪽에서는 Param으로 값을 넣고, 받는쪽에서는 PathVariable로 받도록 했다 ㅋㅋㅋ(내맴인디..?)
최대값을 받아서 50이하의 숫자만 출력하는 것이다...
// 호출하는 쪽
@GetMapping("/test")
public ResponseEntity<Object> test_data(@RequestParam Object data) {
Flux<Object> flux = webClient.method(HttpMethod.GET)
.uri(TEST_API + "/test/{data}", data)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(Object.class);
int[] under50 = flux.toStream().mapToInt(o -> Integer.parseInt(o.toString())).filter(value -> value <= 50).toArray();
System.out.println("test End");
return ResponseEntity.ok(under50);
}
// 호출 당하는 쪽
@GetMapping("/test/{data}")
public ResponseEntity<Object> wc(@PathVariable Integer data) {
ArrayList<Object> list = new ArrayList<>();
for (int i = 0; i < data; i++) {
list.add(i);
}
return ResponseEntity.ok(list);
}
이제 비동기를 배웠으니 또한 서블릿의 변화를 이해할 수 있을 것이다
비동기 서블릿
서블릿 3.0
- 서블릿은 모두 Blocking 기반의 작동. Connection 100개면 Thread 100개 할당
- 서블릿의 기본을 이루고 있는 HTTP request, HTTP response는 InputStream, OutputStream으로 이루어져 있고, 이 내부의 read() 등의 메서드는 모두 Blocking 구조를 갖고 있다.
- HTTP Connection은 이미 논블록킹 IO
서블릿 3.1
- 논블록킹 서블릿 요청, 응답 처리
- Callback
조금 더 우아하게 쓰고싶으면
리스너를 이용한 스프링 비동기
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Slf4j
public class DemoApplication {
@Component
public static class MyService {
@Async
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(2000);
return new AsyncResult<>("hello");
}
}
public static void main(String[] args) {
try (ConfigurableApplicationContext c = SpringApplication.run(DemoApplication.class,
args)) {
}
}
@Autowired
MyService myService;
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
log.info("exit");
};
}
}
작업 완료 됐으면 리스너로 처리하고 싶을 때 쓰는 방법
ListenableFuture를 이용하고, addCallback 메서드를 넣는다.
장점 : 콜백 걸어놓은 다음 바로 빠져나가도 됨. 알림을 주기때문에!
!! Async와 Transaction(@Transactional)을 같이 사용할때는 주의할 점이있다 - Hibernate를 이용할 경우
Propagation을 설정해야된다
https://blog.gangnamunni.com/post/Spring-Async-Hibernate-DB/
참고
'Backend > Java - Spring' 카테고리의 다른 글
스프링 시큐리티 + 소셜로그인 구현(0) - 기본 정보 알기 (0) | 2022.04.14 |
---|---|
Java/SpringBoot 메일을 보내는 방법들 (0) | 2022.03.23 |
DTO instantiate (feat. Lombok, GET/POST) (4) | 2022.03.10 |
댓글