본문 바로가기
320x100
320x100

흔히들 글이나 그림으로 설명을 하기만 한다.

나는 그런 것보다 직접 눈으로 봐야 직성이 풀린다(기능 작성 or 테스트 코드)

자 그럼 WebClient와 함께 테스트를 해보겠다

Spring은 기본적으로 Blocking/Sync 방식으로 동작을 하고 Node.js가 Non-blocking/Async 방식으로 동작을 한다

 

비동기 방식은 순차처리를 지원하지않는다

순서가 중요한 처리는 동기방식으로 처리를 해야한다(또한 잠금이 필요한 경우)

비동기관련용어 - ajax, XMLHttpRequest, Callback, Promise...

 

 

Blocking / Non-blocking 은 호출된 함수가 호출한 함수에게 제어권을 바로 주느냐 안주느냐,
Sync / Async 는 호출된 함수의 종료를 호출한 함수가 처리하느냐, 호출된 함수가 처리하느냐의 차이다.

블로킹 Blocking

  • A 함수가 B 함수를 호출 할 때, B 함수가 자신의 작업이 종료되기 전까지 A 함수에게 제어권을 돌려주지 않는 것

논블로킹 Non-blocking

  • A 함수가 B 함수를 호출 할 때, B 함수가 제어권을 바로 A 함수에게 넘겨주면서, A 함수가 다른 일을 할 수 있도록 하는 것.

동기 Synchronous

  • A 함수가 B 함수를 호출 할 때, B 함수의 결과를 A 함수가 처리하는 것.

비동기 Asynchronous

  • A 함수가 B 함수를 호출 할 때, B 함수의 결과를 B 함수가 처리하는 것. (callback)

 

호출하는 쪽: A

import java.util.concurrent.ExecutionException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@RestController
@RequiredArgsConstructor
public class AController {

    private final WebClient webClient;

    private static final String TEST_API = "localhost:8090";

    @GetMapping("/block-sync")
    public ResponseEntity<Object> block_sync() {
        Mono<String> mono = webClient
            .method(HttpMethod.GET)
            .uri(TEST_API + "/sync")
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(String.class)
            .doOnSuccess(System.out::println);
        Object result = mono.block();
        System.out.println("block-sync End");
        return ResponseEntity.ok(result);
    }

    @GetMapping("/block-async")
    public ResponseEntity<Object> block_async() throws ExecutionException, InterruptedException {
        Mono<String> mono = webClient
            .method(HttpMethod.GET)
            .uri(TEST_API + "/async")
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(String.class)
            .doOnSuccess(System.out::println);
//      CompletableFuture<String> future = mono.toFuture();
        Object result = mono.block();
        System.out.println("block-async End");
        return ResponseEntity.ok(result);
//        return ResponseEntity.ok(future.get());
    }

    @GetMapping("/nonblock-sync")
    public ResponseEntity<Object> nonblock_sync() throws ExecutionException, InterruptedException {
        Mono<String> mono = webClient.method(HttpMethod.GET)
            .uri(TEST_API + "/sync")
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(String.class)
            .doOnSuccess(System.out::println);
//        CompletableFuture<String> future = mono.toFuture();
        Object result = mono.subscribe();
        System.out.println("nonblock-sync End");
        return ResponseEntity.ok(result);
//        return ResponseEntity.ok(future.get());
    }

    @GetMapping("/nonblock-async")
    public ResponseEntity<Object> nonblock_async() {
        Mono<String> mono = webClient.method(HttpMethod.GET)
            .uri(TEST_API + "/async")
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(String.class)
            .doOnSuccess(System.out::println);
        Object result = mono.subscribe();
        System.out.println("nonblock-async End");
        return ResponseEntity.ok(result);
    }

    @GetMapping("/test")
    public ResponseEntity<Object> test_data(@RequestParam Object data) {
        Flux<Object> flux = webClient.method(HttpMethod.GET)
            .uri(TEST_API + "/test/{data}", data)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToFlux(Object.class);
        int[] under50 = flux.toStream().mapToInt(o -> Integer.parseInt(o.toString()))
            .filter(value -> value <= 50).toArray();
        System.out.println("test End");
        return ResponseEntity.ok(under50);
    }

}

 

호출 당하는 쪽: B

// Controller

import com.example.testproject.service.TestService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;

@RequiredArgsConstructor
@RestController
public class BController {

    private final TestService testService;

    @GetMapping("/test")
    public String test() {
        System.out.println("안녕하세요");
        return "hello";
    }

    @GetMapping("/sync")
    public ResponseEntity<Object> sync() throws InterruptedException {
        String result1 = testService.sync1();
        String result2 = testService.sync2();
        String result3 = testService.sync3();
        String answer = String.join(",", result1, result2, result3);
        System.out.println("sync called");
        return ResponseEntity.status(HttpStatus.OK).body(answer);
    }

    @GetMapping("/async")
    public ResponseEntity<Object> async() throws InterruptedException, ExecutionException {
        String result1 = testService.async1();
        String result2 = testService.async2().get();
        String result3 = testService.async3();
        String answer = String.join(",", result1, result2, result3);
        System.out.println("async called");
        return ResponseEntity.status(HttpStatus.OK).body(answer);
    }

    @GetMapping("/test/{data}")
    public ResponseEntity<Object> wc(@PathVariable Integer data) {
        ArrayList<Object> list = new ArrayList<>();
        for (int i = 0; i < data; i++) {
            list.add(i);
        }
        return ResponseEntity.ok(list);
    }
}

결과값들을 받아서 ,를 구분자로 합치고 출력하고 반환하는 컨트롤러이다.

 

// Service

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.*;

@Service
public class TestService {
    public String sync1() throws InterruptedException {
        Thread.sleep(3000);
        System.out.println("sync1");
        return "sync1";
    }

    public String sync2() throws InterruptedException {
        Thread.sleep(3000);
        System.out.println("sync2");
        return "sync2";
    }

    public String sync3() throws InterruptedException {
        Thread.sleep(3000);
        System.out.println("sync3");
        return "sync3";
    }

    @Async
    public String async1() throws InterruptedException {
        Thread.sleep(3000);
        System.out.println("async1");
        return "async1";
    }

    @Async
    public CompletableFuture<String> async2() throws InterruptedException, ExecutionException {
        ExecutorService executor
                = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(() -> {
            Thread.sleep(3000);
            System.out.println("async2");
            return "async2";
        });
        return CompletableFuture.completedFuture(future.get());
    }

    @Async
    public String async3() throws InterruptedException {
        Thread.sleep(3000);
        System.out.println("async3");
        return "async3";
    }
}

간단하게 3초 쉬고 프린트하고 문자열을 반환하는 서비스이다. 특이하게 async2 메서드는 Future타입을 반환하는 것을 눈여겨서 보자

 

짧막하게 WebClient에서는 Mono, Flux 타입 2가지가 있는데 Mono는 0~1개의 데이터를 전달, Flux 0~N개의 데이터를 전달한다는 의미이다.

WebClient도 나중에 자세히 포스팅을 하겠다!(이젠 RestTemplate은 보내주go...)

 

블록킹과 논블록킹은 제어권을 누가 갖느냐의 차이이다.

동기와 비동기는 처리는 성공/실패로 나뉘지만 결과가 언제올지 모르는 것..

 

보통 블록-동기 / 논블록-비동기 모델로 프로그래밍을 한다고 한다

 

자 이제 코드가 다 있으니 테스트를 해본다

block으로 Blocking을, subscribe로 Non-blocking이 된다

1.  Blocking + Synchronous

// 요청

// 호출 하는 쪽

// 호출 당하는 쪽

// 결과

호출한쪽: 예상 그대로 block이 다 끝나고 난 뒤 밑에 프린트문이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨주지 않음)

호출받은 쪽: sync방식이기때문에 순차적으로 실행이 된다

2.  Blocking + Asynchronous

// 요청

// 호출 하는 쪽

// 호출 당하는 쪽

// 결과

호출한쪽: 예상 그대로 block이 다 끝나고 난 뒤 밑에 프린트문이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨주지 않음)

호출받은 쪽: async방식이기때문에 순서와 상관없이 실행된다

결과물은 Future로 설정했었던 async2만 값을 받아오고 나머진 null로 처리가 되었다

3.  Non-Blocking + Synchronous

// 요청

// 호출 하는 쪽

// 호출 당하는 쪽

// 결과

호출한쪽: 밑의 프린트문이 출력되고 난 뒤에 결과값이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨줌)

여기서 또 눈여겨 볼 부분은 aop가 중간부분에서 실행됐다는 점이다(!!)

호출받은 쪽: sync방식이기때문에 순차적으로 실행된다

결과물은 subscribe방식으로 불러와서 dispose(분해/처리)가 되었다

[ 사실 주석한 코드처럼 toFutre로 불러오면....!! ]

4.  Non-Blocking + Asynchronous

// 요청

// 호출 하는 쪽

// 호출 당하는 쪽

// 결과

호출한쪽: 밑의 프린트문이 출력되고 난 뒤에 결과값이 출력됐다(Mono가 제어권을 원래스레드한테 넘겨줌)

여기서 또 눈여겨 볼 부분은 aop가 중간부분에서 실행됐다는 점이다(!!)

호출받은 쪽: async방식이기때문에 순서와 상관없이 실행된다

결과물은 subscribe방식으로 불러와서 dispose(분해/처리)가 되었다

[ 사실 주석한 코드처럼 toFutre로 불러오면....!! ]

 

--------------------------------------------------------------------------------------------------------

 

자 이제 위의 개념들에 대해서 알거라 생각한다.

추가로 코드를 바꿔본다

결과값이 없던 non-block방식의 response만 Future로 바꿔본다

// 3번 수정

@GetMapping("/nonblock-sync")
    public ResponseEntity<Object> nonblock_sync() throws ExecutionException, InterruptedException {
        Mono<String> mono = webClient.method(HttpMethod.GET)
                .uri(TEST_API+"/sync")
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(String.class)
                .doOnSuccess(System.out::println);
        CompletableFuture<String> future = mono.toFuture();
//        Object result = mono.subscribe();
        System.out.println("nonblock-sync End");
//        return ResponseEntity.ok(result);
        return ResponseEntity.ok(future.get());
    }

nonblock-sync 요청상태에서

Futre로 뽑는다면 response는 어떻게 될까?

// 요청

// 호출하는 쪽

// 호출 당하는 쪽

// 결과

아까와는 다르게 전부 다 Response로 뽑힌다!

결국 sync방식은 그냥 뽑으나 Future로 뽑으나 그게그거인거같다

 

이번에는 4번 코드를 수정해보았다

@GetMapping("/nonblock-async")
    public ResponseEntity<Object> nonblock_async() throws ExecutionException, InterruptedException {
        Mono<String> mono = webClient.method(HttpMethod.GET)
                .uri(TEST_API+"/async")
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(String.class)
                .doOnSuccess(System.out::println);
        CompletableFuture<String> future = mono.toFuture();
//        Object result = mono.subscribe();
        System.out.println("nonblock-async End");
//        return ResponseEntity.ok(result);
        return ResponseEntity.ok(future.get());
    }

 

// 요청

// 호출하는 쪽

// 호출 당하는 쪽

// 결과

아까와는 다르게 전부 다 Response로 뽑힌다! 하지만 null이 보인다

결국 async방식은 Future로 된 객체만 response 한다는 결론이다!!

 

추가로 Flux 타입도 사용해보았다

호출하는쪽에서는 Param으로 값을 넣고, 받는쪽에서는 PathVariable로 받도록 했다 ㅋㅋㅋ(내맴인디..?)

최대값을 받아서 50이하의 숫자만 출력하는 것이다...

// 호출하는 쪽

@GetMapping("/test")
public ResponseEntity<Object> test_data(@RequestParam Object data) {
    Flux<Object> flux = webClient.method(HttpMethod.GET)
            .uri(TEST_API + "/test/{data}", data)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToFlux(Object.class);
    int[] under50 = flux.toStream().mapToInt(o -> Integer.parseInt(o.toString())).filter(value -> value <= 50).toArray();
    System.out.println("test End");
    return ResponseEntity.ok(under50);
}

// 호출 당하는 쪽

@GetMapping("/test/{data}")
public ResponseEntity<Object> wc(@PathVariable Integer data) {
    ArrayList<Object> list = new ArrayList<>();
    for (int i = 0; i < data; i++) {
        list.add(i);
    }
    return ResponseEntity.ok(list);
}

 

이제 비동기를 배웠으니 또한 서블릿의 변화를 이해할 수 있을 것이다

비동기 서블릿

서블릿 3.0

  • 서블릿은 모두 Blocking 기반의 작동. Connection 100개면 Thread 100개 할당
  • 서블릿의 기본을 이루고 있는 HTTP request, HTTP response는 InputStream, OutputStream으로 이루어져 있고, 이 내부의 read() 등의 메서드는 모두 Blocking 구조를 갖고 있다.
  • HTTP Connection은 이미 논블록킹 IO

서블릿 3.1

  • 논블록킹 서블릿 요청, 응답 처리
  • Callback

 

조금 더 우아하게 쓰고싶으면 

리스너를 이용한 스프링 비동기

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Slf4j
public class DemoApplication {

    @Component
    public static class MyService {

        @Async
        public ListenableFuture<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(2000);
            return new AsyncResult<>("hello");
        }
    }

    public static void main(String[] args) {
        try (ConfigurableApplicationContext c = SpringApplication.run(DemoApplication.class,
            args)) {
        }
    }

    @Autowired
    MyService myService;

    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> f = myService.hello();
            f.addCallback(s -> System.out.println(s), e -> System.out.println(e.getMessage()));
            log.info("exit");
        };
    }
}

 

작업 완료 됐으면 리스너로 처리하고 싶을 때 쓰는 방법
ListenableFuture를 이용하고, addCallback 메서드를 넣는다.

장점 : 콜백 걸어놓은 다음 바로 빠져나가도 됨. 알림을 주기때문에!

 

!! Async와 Transaction(@Transactional)을 같이 사용할때는 주의할 점이있다 - Hibernate를 이용할 경우

Propagation을 설정해야된다

https://blog.gangnamunni.com/post/Spring-Async-Hibernate-DB/

 

여러개의 DB 작업을 동시에 수행하고 결과를 합쳐서 보내줘야 할 때

Spring 에서 @Async 를 이용한 동시다발적인 Hibernate DB 작업 by 강남언니 블로그

blog.gangnamunni.com

 

참고

https://jh-7.tistory.com/25

https://codechacha.com/ko/java-future/

https://heekim0719.tistory.com/385?category=782470 

댓글