본문 바로가기
Lecture

대용량 트래픽을 대비한 Spring Webflux와 Reactive Redis를 이용한 접속자 대기열 시스템

by Renechoi 2023. 10. 22.
  1. 목차
  2. 프로젝트 소개
    • 소개
    • 사용 기술 스택
  3. API
  4. 프로젝트 구조
    • 패키지 구조
    • 데이터 흐름
    • 주요 클래스 설명
  5. 소스 코드

1. 프로젝트 소개

소개

Spring Webflux와 Reactive Redis를 실습하기 위한 MVP 프로젝트이다. 어플리케이션은 접속자 대기열 시스템과 타깃 페이지 시스템으로 나뉜다.

  • 코드 및 자료 출처는 패스트캠퍼스 시그니처 백엔드 패키지 course 7이다.

사용 기술 스택

  • Spring Webflux: 5.3.14
  • Reactive Redis: 2.6.1
  • Spring Boot: 3.0.9
  • Java: JDK 17
  • Gradle: 7.3
  • Thymeleaf: 3.0.14
  • JUnit: 5.8.2
  • Docker: 20.10.10

2. API

Localhost:9010

  • /api/v1/queue

    • POST /?queue={queue_name}&user_id={user_id}: 사용자를 대기열에 등록
      • queue_name: 대기열 이름, 기본값은 'default'
      • user_id: 사용자 ID
    • POST /allow?queue={queue_name}&count={count}: 특정 수의 사용자를 대기열에서 제거하고 접근을 허용
      • queue_name: 대기열 이름, 기본값은 'default'
      • count: 접근을 허용할 사용자 수
    • GET /allowed?queue={queue_name}&user_id={user_id}&token={token}: 특정 사용자가 접근이 허용되었는지 확인
      • queue_name: 대기열 이름, 기본값은 'default'
      • user_id: 사용자 ID
      • token: 인증 토큰
    • GET /rank?queue={queue_name}&user_id={user_id}: 특정 사용자의 대기열 순위를 확인
      • queue_name: 대기열 이름, 기본값은 'default'
      • user_id: 사용자 ID
    • GET /touch?queue={queue_name}&user_id={user_id}: 사용자에게 토큰을 발급하고 쿠키에 저장
      • queue_name: 대기열 이름, 기본값은 'default'
      • user_id: 사용자 ID
  • /waiting-room

    • GET /waiting-room?queue={queue_name}&user_id={user_id}&redirect_url={redirect_url}: 대기 웹페이지를 렌더링
      • queue_name: 대기열 이름, 기본값은 'default'
      • user_id: 사용자 ID
      • redirect_url: 대기가 완료된 후 리다이렉션될 URL, 기본값은 'http://localhost:9000'

Localhost:9000

  • /
    • GET /?queue={queue_name}&user_id={user_id}: 타겟 페이지. 여기로 리다이렉션될 경우, 대기가 완료된 것
      • queue_name: 대기열 이름, 기본값은 'default'
      • user_id: 사용자 ID

3. 프로젝트 구조

패키지 구조

접속자 대기열 시스템

대용량 트래픽을 처리하는 접속자 대기열 시스템으로 UserQueueController, WaitingRoomController, 그리고 UserQueueService 클래스를 중심으로 구현되어 있다.

  • UserQueueController: API를 통해 대기열에 유저를 등록하거나 특정 수의 유저를 허용한다.
  • WaitingRoomController: 대기 중인 유저를 대기실 페이지로 리다이렉트한다.
  • UserQueueService: 대기열 로직을 처리합니다. 이 클래스에서는 Redis를 활용해 대기열을 관리한다.
├── main
│   ├── java
│   │   ├── com
│   │   │   └── yourpackage
│   │   │       ├── controller
│   │   │       │   ├── UserQueueController.java
│   │   │       │   └── WaitingRoomController.java
│   │   │       ├── service
│   │   │       │   └── UserQueueService.java
│   │   │       └── WebsiteApplication.java
│   ├── resources
│   │   ├── templates
│   │   │   └── waiting-room.html
│   │   └── application.properties
└── test

타깃 페이지

간단하게 WebsiteApplication 클래스 하나로 구성되어 있다. 대기가 끝난 사용자를 타겟 웹페이지로 리다이렉트한다. 이 클래스는 토큰을 기반으로 사용자가 허용되었는지 확인 후, 허용된 사용자는 타깃 페이지로, 그렇지 않은 사용자는 대기실로 리다이렉트한다.

각 페이지는 Thymeleaf 템플릿을 사용하여 렌더링되며, 대기 상태나 허용 상태에 따라 다른 내용을 보여준다.

AllowedUserResponse 레코드는 타겟 페이지에 접속이 허용된 사용자를 확인하기 위해 사용된다.

데이터 흐름

  1. 사용자가 타겟 웹페이지에 접근을 시도한다.
  2. 서버는 사용자의 토큰과 기타 필요한 검증절차를 거친다.
  3. 검증 이후, 사용자가 대기열에 들어가야 하는지, 아니면 바로 원하는 페이지로 리다이렉트될 수 있는지를 판단한다.
    • 대기열 진입: 사용자는 대기 상태 페이지에 머무른다.
    • 리다이렉트: 사용자는 원하는 페이지로 바로 이동한다.

주요 클래스 설명

대기열 시스템

UserQueueController

이 클래스는 대기열 관련 API를 제공한다. 사용자 등록, 사용자 허용, 대기 순위 조회 등의 엔드포인트를 가진다.

  • registerUser: 대기열에 사용자를 등록.
  • allowUser: 지정한 수의 사용자를 대기열에서 허용 상태로 변경.
  • isAllowedUser: 사용자가 허용 상태인지 확인.
  • getRankUser: 사용자의 대기 순위를 조회.
  • touch: 대기 상태를 유지하기 위한 토큰을 생성.

WaitingRoomController

웹 페이지와 연동하여 대기 상태를 보여주는 컨트롤러.

  • waitingRoomPage: 대기 상태를 보여주는 뷰를 렌더링한다.

UserQueueService

대기열의 핵심 로직을 처리하는 서비스 클래스.

  • registerWaitQueue: 대기열에 사용자를 등록하고 대기 순위를 반환한다.
  • allowUser: 지정한 수의 사용자를 대기열에서 허용 상태로 변경한다.
  • isAllowedByToken: 토큰을 사용하여 사용자가 허용 상태인지 확인한다.
  • getRank: 사용자의 대기 순위를 조회한다.
  • generateToken: 대기 상태를 유지하기 위한 토큰을 생성한다.
  • scheduleAllowUser: 주기적으로 대기 상태의 사용자를 허용 상태로 변경하는 스케쥴링 메소드.

타깃 페이지

WebsiteApplication

어플리케이션의 진입점이자 타겟 웹 페이지와 대기열 로직을 연결하는 컨트롤러이다.

  • index: 타겟 웹페이지를 렌더링하거나 대기 상태가 아니라면 대기 페이지로 리다이렉트한다.

4. 소스코드


대기열 접속자 시스템

@RestController
@RequestMapping("/api/v1/queue")
@RequiredArgsConstructor
public class UserQueueController {
    private final UserQueueService userQueueService;

    @PostMapping("")
    public Mono<RegisterUserResponse> registerUser(@RequestParam(name = "queue", defaultValue = "default") String queue,
                                                   @RequestParam(name = "user_id") Long userId) {
        return userQueueService.registerWaitQueue(queue, userId)
                .map(RegisterUserResponse::new);
    }

    @PostMapping("/allow")
    public Mono<AllowUserResponse> allowUser(@RequestParam(name = "queue", defaultValue = "default") String queue,
                                             @RequestParam(name = "count") Long count) {
        return userQueueService.allowUser(queue, count)
                .map(allowed -> new AllowUserResponse(count, allowed));
    }

    @GetMapping("/allowed")
    public Mono<AllowedUserResponse> isAllowedUser(@RequestParam(name = "queue", defaultValue = "default") String queue,
                                                   @RequestParam(name = "user_id") Long userId,
                                                   @RequestParam(name = "token") String token) {
        return userQueueService.isAllowedByToken(queue, userId, token)
                .map(AllowedUserResponse::new);
    }

    @GetMapping("/rank")
    public Mono<RankNumberResponse> getRankUser(@RequestParam(name = "queue", defaultValue = "default") String queue,
                                                @RequestParam(name = "user_id") Long userId) {
        return userQueueService.getRank(queue, userId)
                .map(RankNumberResponse::new);
    }


    @GetMapping("/touch")
    Mono<?> touch(@RequestParam(name = "queue", defaultValue = "default") String queue,
                  @RequestParam(name = "user_id") Long userId,
                  ServerWebExchange exchange) {
        return Mono.defer(() -> userQueueService.generateToken(queue, userId))
                .map(token -> {
                    exchange.getResponse().addCookie(
                            ResponseCookie
                                    .from("user-queue-%s-token".formatted(queue), token)
                                    .maxAge(Duration.ofSeconds(300))
                                    .path("/")
                                    .build()
                    );
                    return token;
                });
    }
}



@Controller
@RequiredArgsConstructor
public class WaitingRoomController {
    private final UserQueueService userQueueService;


    @GetMapping("/waiting-room")
    Mono<Rendering> waitingRoomPage(@RequestParam(name = "queue", defaultValue = "default") String queue,
                                    @RequestParam(name = "user_id") Long userId,
                                    @RequestParam(name = "redirect_url", defaultValue = "http://localhost:9000") String redirectUrl,
                                    ServerWebExchange exchange) {
        var key = "user-queue-%s-token".formatted(queue);
        var cookieValue = exchange.getRequest().getCookies().getFirst(key);
        var token = (cookieValue == null) ? "" : cookieValue.getValue();


        return userQueueService.isAllowedByToken(queue, userId, token)
                .filter(allowed -> allowed)
                .flatMap(allowed -> Mono.just(Rendering.redirectTo(redirectUrl).build()))
                .switchIfEmpty(
                        userQueueService.registerWaitQueue(queue, userId)
                                .onErrorResume(ex -> userQueueService.getRank(queue, userId))
                                .map(rank -> Rendering.view("waiting-room")
                                        .modelAttribute("number", rank)
                                        .modelAttribute("userId", userId)
                                        .modelAttribute("queue", queue)
                                        .build()
                                )
                );
    }
}




@Slf4j
@Service
@RequiredArgsConstructor
public class UserQueueService {
    private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
    private final String USER_QUEUE_WAIT_KEY = "users:queue:%s:wait";
    private final String USER_QUEUE_WAIT_KEY_FOR_SCAN = "users:queue:*:wait";
    private final String USER_QUEUE_PROCEED_KEY = "users:queue:%s:proceed";

    @Value("${scheduler.enabled}")
    private Boolean scheduling = false;

    public Mono<Long> registerWaitQueue(final String queue, final Long userId) {

        var unixTimestamp = Instant.now().getEpochSecond();
        return reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString(), unixTimestamp)
                .filter(i -> i)
                .switchIfEmpty(Mono.error(ErrorCode.QUEUE_ALREADY_REGISTERED_USER.build()))
                .flatMap(i -> reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString()))
                .map(i -> i >= 0 ? i + 1 : i);
    }


    public Mono<Long> allowUser(final String queue, final Long count) {
        return reactiveRedisTemplate.opsForZSet().popMin(USER_QUEUE_WAIT_KEY.formatted(queue), count)
                .flatMap(member -> reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_PROCEED_KEY.formatted(queue), member.getValue(), Instant.now().getEpochSecond()))
                .count();
    }

    public Mono<Boolean> isAllowed(final String queue, final Long userId) {
        return reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_PROCEED_KEY.formatted(queue), userId.toString())
                .defaultIfEmpty(-1L)
                .map(rank -> rank >= 0);
    }

    public Mono<Boolean> isAllowedByToken(final String queue, final Long userId, final String token) {
        return this.generateToken(queue, userId)
                .filter(gen -> gen.equalsIgnoreCase(token))
                .map(i -> true)
                .defaultIfEmpty(false);
    }

    public Mono<Long> getRank(final String queue, final Long userId) {
        return reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY.formatted(queue), userId.toString())
                .defaultIfEmpty(-1L)
                .map(rank -> rank >= 0 ? rank + 1 : rank);
    }

    public Mono<String> generateToken(final String queue, final Long userId) {
        MessageDigest digest = null;
        try {
            digest = MessageDigest.getInstance("SHA-256");
            var input = "user-queue-%s-%d".formatted(queue, userId);
            byte[] encodedHash = digest.digest(input.getBytes(StandardCharsets.UTF_8));

            StringBuilder hexString = new StringBuilder();
            for (byte aByte: encodedHash) {
                hexString.append(String.format("%02x", aByte));
            }
            return Mono.just(hexString.toString());
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
    }


    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void scheduleAllowUser() {
        if (!scheduling) {
            log.info("passed scheduling...");
            return;
        }
        log.info("called scheduling...");

        var maxAllowUserCount = 100L;
        reactiveRedisTemplate.scan(ScanOptions.scanOptions()
                        .match(USER_QUEUE_WAIT_KEY_FOR_SCAN)
                        .count(100)
                        .build())
                .map(key -> key.split(":")[2])
                .flatMap(queue -> allowUser(queue, maxAllowUserCount).map(allowed -> Tuples.of(queue, allowed)))
                .doOnNext(tuple -> log.info("Tried %d and allowed %d members of %s queue".formatted(maxAllowUserCount, tuple.getT2(), tuple.getT1())))
                .subscribe();
    }
}


<html>
<!DOCTYPE html>
<html lang="ko" xmlns:th="http://www.thymeleaf.org">
<head>
  <meta charset="utf-8">
  <title>접속자대기열시스템</title>

</head>
<body>
<div class="message">
  <h1>접속량이 많습니다.</h1>
  <span>현재 대기 순번 </span><span id="number">[[${number}]]</span><span> 입니다.</span>
  <br/>
  <p>서버의 접속량이 많아 시간이 걸릴 수 있습니다.</p>
  <p>잠시만 기다려주세요.</p>
  <p id="updated"></p>
  <br/>
</div>
<script>
  function fetchWaitingRank() {
    const queue = '[[${queue}]]';
    const userId = '[[${userId}]]';
    const queryParam = new URLSearchParams({queue: queue, user_id: userId});
    fetch('/api/v1/queue/rank?' + queryParam)
      .then(response => response.json())
      .then(data => {
        if(data.rank < 0) {
          // token을 받아오도록 touch api 호출
          fetch('/api/v1/queue/touch?' + queryParam)
            .then(response => {
              document.querySelector('#number').innerHTML = 0;
              document.querySelector('#updated').innerHTML = new Date();

              const newUrl = window.location.origin + window.location.pathname + window.location.search;
              window.location.href = newUrl;
            })
            .catch(error => console.error(error));
          return;
        }
        document.querySelector('#number').innerHTML = data.rank;
        document.querySelector('#updated').innerHTML = new Date();
      })
      .catch(error => console.error(error));
  }

  setInterval(fetchWaitingRank, 3000);
</script>
</body>
</html>

타깃 페이지

@SpringBootApplication
@Controller
public class WebsiteApplication {
    RestTemplate restTemplate = new RestTemplate();

    public static void main(String[] args) {
        SpringApplication.run(WebsiteApplication.class, args);
    }

    /**
     * 타겟 웹페이지 api -> 허용이 안되어 있다면 대기열로 갔다가 허용이 되면 여기로 다시 redirect 됨
     * @param queue
     * @param userId
     * @param request
     * @return
     */
    @GetMapping("/")
    public String index(@RequestParam(name = "queue", defaultValue = "default") String queue,
                        @RequestParam(name = "user_id") Long userId,
                        HttpServletRequest request) {
        var cookies = request.getCookies();
        var cookieName = "user-queue-%s-token".formatted(queue);

        String token = "";
        if (cookies != null) {
            var cookie = Arrays.stream(cookies).filter(i -> i.getName().equalsIgnoreCase(cookieName)).findFirst();
            token = cookie.orElse(new Cookie(cookieName, "")).getValue();
        }

        // 토큰 기반으로 cookie 체크 -> 쿠키는 포트와 상관없이 도메인 기반으로 공유되기 때문에 쿠키가 활용될 수 있는 것
        var uri = UriComponentsBuilder
                .fromUriString("http://127.0.0.1:9010")
                .path("/api/v1/queue/allowed")
                .queryParam("queue", queue)
                .queryParam("user_id", userId)
                .queryParam("token", token)
                .encode()
                .build()
                .toUri();

        ResponseEntity<AllowedUserResponse> response = restTemplate.getForEntity(uri, AllowedUserResponse.class);
        if (response.getBody() == null || !response.getBody().allowed()) {
            // 대기 웹페이지로 리다이렉트
            return "redirect:http://127.0.0.1:9010/waiting-room?user_id=%d&redirect_url=%s".formatted(
                    userId, "http://127.0.0.1:9000?user_id=%d".formatted(userId));
        }
        // 허용 상태라면 해당 페이지를 진입
        return "index";
    }

    public record AllowedUserResponse(Boolean allowed) {
    }
}


<!DOCTYPE html>
<html lang="ko" xmlns:th="http://www.thymeleaf.org">
<head>
  <title>대용량 트래픽처리를 위한 백엔드심화</title>
  <meta charset="utf-8">

</head>
<body>
<div class="main-content">
  <h1>대용량 트래픽처리를 위한 백엔드심화</h1>
</div>
</body>
</html>

reference :

반응형