본문 바로가기

오늘의 공부 & 기록

api 동시 요청 수 제한하는 방법

스프링부트에 내장되어 있는 서블릿 컨테이너인 톰캣이 다중요청을 처리해줌

 

톰캣은 다중 요청을 처리하기 위해 부팅 시 스레드 컬렉션인 스레드풀을 생성함

유저 요청이 들어오면 스레드 풀에서 하니씩 스레드를 할당

해당 스레드에서 부트에서 작성한 디스패처서블릿을 거쳐 유저 요청을 처리함

작업을 수행하고 나면 스레드는 스레드 풀로 반환 됨

 

스프링부트에서는 서블릿 컨테이너인 내장 톰캣에서 웹 애플리케이션을 실행하면서 여러 api 요청을 처리할 때

기본적으로 api 요청이 들어올 때마다 새로운 스레드를 생성하여 다중 스레드를 사용하여 병렬적으로 요청을 처리함

내부적으로 스레드 풀을 사용하여 효율적으로 스레드를 관리함

각각의 스레드는 독립적으로 해당 요청을 처리하므로 여러 요청이 동시에 처리될 수 있음

-> api 요청이 여러개 들어오고 들어올 때마다 새로운 스레드를 생성하여 다중 스레드로 각자 자기 요청 로직 처리함

 

스레드 풀은 요청의 수를 제어하여 과도한 스레드 생성을 방지하고 시스템 리소스를 효율적으로 사용하도록 도와줌

 

스프링은 기본적으로 서블릿 컨테이너의 설정에 따라 스레드 풀을 조절하며 사용자가 따로 설정하지 않아도 큰 문제 없이 동작하게 해주지만 application.properties 파일에 설정을 직접 작성하여 관리할 수 있음

 

# Tomcat 스레드 풀 설정

server.tomcat.max-threads=100

server.tomcat.min-spare-threads=10

 

 

위 처럼 작성해 놓으면 내장톰캣이 설정을 인식하여 사용함

 

 

스레드 풀 100으로 설정 후 100까지 다 찬 경우(=스레드 풀의 최대 스레드 개수 도달)

새로운 요청은 대기 상태가 됨(스레드 풀이 가득 차면 추가적인 요청은 처리되지 않고 대기하게됨)

이때 동작은 서블릿 컨테이너에 따라 다를 수 있는데 일반적으로 Blocking(블로킹), Connection Refused(연결거부), Queueing(대기열) 중 하나의 옵션을 선택하게됨

 

Blocking(블로킹)

새로운 요청이 들어오면 해당 요청은 스레드 풀에서 사용 가능한 스레드가 발생할 때 까지 대기

Connection Refused(연결 거부)

새로운 요청이 들어오면 서버가 해당 요청을 거부하고 클라이언트에게 연결 거부 응답 반환(클라이언트는 이 응답을 받고 다시 시도하거나 다른 처리 방법을 선택해야 함)

Queueing(대기열)

새로운 요청은 스레드 풀에서 사용 가능한 스레드가 생길 때 까지 대기열에 추가됨

대기열은 일정 수의 요청을 보관하고 스레드 풀이 요청을 처리할 때마다 대기열에서 새로운 요청을 꺼내서 처리함

 

이 방법은 지금 내가 구현해야 하는 부분보다 더 큰 범위로 볼 수 있음 나는 api 마다 요청의 수, 순서를 제어해야하는데 이 방법은 api 전체적인 동시성을 관리할 수 있는 방법임

 

특정 엔드포인트에 대한 대기열을 따로 관리하고 싶다면 해당 엔드포인트에서 비동기적인 방식으로 작업을 수행하고 비동기 작업을 처리할 스레드 풀을 별도로 설정하는 방법을 찾아야함

 

방법1

서비스에 @Async, TaskExecutor, Async에 관한 @Configuration, 컨트롤러에 CompletableFuture나 ListenableFuture를 사용하여 비동기적인 메서드를 정의하고 해당 메서드가 실행될 스레드 풀을 구성함

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/api")
public class MyController {

    @Autowired
    private MyService myService;

    @GetMapping("/processAsync")
    public DeferredResult<ResponseEntity<String>> processAsync() {
        DeferredResult<ResponseEntity<String>> deferredResult = new DeferredResult<>();

        // 비동기 작업 호출
        CompletableFuture<String> asyncResult = myService.processAsync();

        // 작업이 완료되면 결과 처리
        asyncResult.whenComplete((result, throwable) -> {
            if (throwable != null) {
                deferredResult.setErrorResult(ResponseEntity.status(500).body("Error"));
            } else {
                deferredResult.setResult(ResponseEntity.ok(result));
            }
        });

        return deferredResult;
    }
}

 

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    @Async("myAsyncExecutor")
    public void processAsync() {
        // 비동기 작업 수행
    }
}

 

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Configuration
public class AsyncConfig {

    @Bean("myAsyncExecutor")
    public TaskExecutor myAsyncExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
}

 

 

다른 방법

 

스레드 풀 및 Semaphore를 활용

ThreadPoolTaskExecutor를 사용하여 스레드 풀을 설정하고 Semaphore를 사용하여 요청을 받을 때 마다 허용된 개수 이내에서만 처리하도록 할 수 있음

@Service
public class MyService {

    private final ExecutorService executorService;
    private final Semaphore semaphore;

    public MyService() {
        int availableThreads = 3; // 허용된 동시 처리 요청 개수
        this.executorService = Executors.newFixedThreadPool(availableThreads);
        this.semaphore = new Semaphore(availableThreads);
    }

    public void processRequest() {
        try {
            semaphore.acquire(); // Semaphore 허용 개수 이내에서만 획득
            CompletableFuture.runAsync(() -> {
                // 실제 요청 처리 로직
            }, executorService).whenComplete((result, throwable) -> {
                semaphore.release(); // 처리가 완료되면 Semaphore 해제
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 예외 처리 로직
        }
    }
}

 

 

또 다른 방법

스프링 AOP와 (사용자 정의) 어노테이션을 활용

@RequestLimit 어노테이션 만들기

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RequestLimit {
    int value() default 3; // 허용된 동시 처리 요청 개수
}
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;

import java.util.concurrent.Semaphore;

@Aspect
@Component
public class RequestLimitAspect {

    private final Semaphore semaphore;

    public RequestLimitAspect() {
        int availableThreads = 3; // 허용된 동시 처리 요청 개수
        this.semaphore = new Semaphore(availableThreads);
    }

    @Pointcut("@annotation(com.example.RequestLimit)")
    public void requestLimitAnnotation() {
    }

    @Around("requestLimitAnnotation()")
    public Object limitRequests(ProceedingJoinPoint joinPoint) throws Throwable {
        try {
            semaphore.acquire(); // Semaphore 허용 개수 이내에서만 획득
            return joinPoint.proceed();
        } finally {
            semaphore.release(); // 처리가 완료되면 Semaphore 해제
        }
    }
}
@Service
public class MyService {

    @RequestLimit
    public void processRequest() {
        // 실제 요청 처리 로직
    }
}

 

 

 

메시지 큐를 활용하여 엔드포인트 요청의 개수를 제한/대기하는 방법

 

토큰 버킷 알고리즘(Token Bucket Algorithm)

일정 시간 간격으로 일정 개수의 토큰을 생성하고 각 요청이 처리되기 전에 토큰을 소비하도록 하는 방식

메시지 큐에는 토큰 생성자가 토큰을 큐에 삽입하고 엔드포인트에서 요청을 처리하기 전에 토큰을 큐에서 가져와야함

Java에서는 ScheduledExecutorService와 BlockingQueue 등을 사용하여 구현할 수 있음

 

비동기 큐 및 대기 시간 관리

비동기 큐를 사용하여 엔드포인트 요청을 처리하고 각 요청이 큐에 들어오면 대기 시간을 측정함

대기 시간이 일정 값 이상인 경우 요청을 처리하고 그렇지 않으면 대기열에서 요청을 대기시킴

Java에서는 CompletableFutrue, DelayQueue 등을 사용하여 비동기적으로 대기 및 처리 할 수 있음 

 

큐에 길이 제한과 거부 정책

메시지 큐에는 큐의 길이를 제한하고 요청이 들어올 때 큐에 자리가 있는 경우 처리하고 자리가 없는 경우 거부하는 정책을 적용함

이는 메시지 큐 설정에 따라 다를 수 있으며 일부 큐는 길이 제한을 초과하는 경우 새로운 요청을 거부하거나 특정 정책에 따라 처리함

 

큐에 일정 시간 동안 대기하도록 하는 방법

메시지 큐에는 일정 시간 동안 대기하도록 하는 메커니즘이 있을 수 있음

이는 메시지를 큐에 넣고 특정 시간이 경과하면 큐에서 메시지를 가져와서 처리

예를들어 RabbitMQ에는 TTL을 사용하여 메시지가 큐에 얼마동안 남아 있을 지 설정할 수 있음 

 

메시지큐 

 

메시지 큐 시스템은 대표적으로 RabbitMQ, Apache Kafka, Amazone SQS, Redis 등이 있음

 

 

카프카 메시지 큐를 활용하여 엔드포인트 요청의 개수를 제한/대기하는 방법

카프카는 메시지 브로커로서 대규모 분산 시스템에서 이벤트 스트리밍을 위한 플랫폼으로 사용됨

 

  1. 메시지 프로듀서:
    • 엔드포인트 요청이 들어올 때마다 카프카 토픽에 메시지를 전송합니다.
    • 각 메시지는 요청을 나타내며, 메시지에는 요청한 사용자, 시간, 또는 기타 관련 정보가 포함될 수 있습니다.
  2. 메시지 컨슈머:
    • 카프카 컨슈머가 특정 토픽에서 메시지를 수신합니다.
    • 메시지를 받으면 해당 요청에 대한 처리를 시작하거나, 대기 큐에 추가합니다.
  3. 대기 큐:
    • 각 메시지에 대한 처리 시간을 추적합니다.
    • 만약 동시에 처리할 수 있는 요청의 개수를 제한하려면, 처리 중인 요청의 개수를 제한하거나, 처리에 소요되는 시간을 조절해야 합니다.
  4. 제한된 처리 및 대기:
    • 대기 큐에 요청이 추가될 때마다, 현재 처리 중인 요청이 지정된 개수 이하인지 확인합니다.
    • 지정된 개수 이하일 경우 즉시 처리를 시작하고, 그 이상일 경우 대기열에 추가하여 나중에 처리될 수 있도록 합니다.
    • 요청 처리가 완료되면 해당 요청에 대한 정보를 대기 큐에서 제거합니다.
  5. 처리 속도 제어:
    • 각 요청에 대한 처리 속도를 제어하려면, 대기 큐에 요청이 추가된 시간을 기록하고, 일정 시간 동안 대기 후에 처리를 시작합니다.
    • 또는, 특정 사용자 또는 엔드포인트에 대한 처리 속도를 제어하는 방법을 도입할 수 있습니다.

이는 매우 간단한 아이디어이며, 실제 구현은 프로덕션 환경의 요구사항과 복잡성에 따라 달라질 것입니다. 처리 속도 제어, 동시 처리 요청 수 제한, 대기 큐 처리 등에 대한 정교한 제어를 위해서는 좀 더 심층적인 구현과 설정이 필요할 것입니다.

 

프로듀서

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class RequestProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendRequest(String userId, String message) {
        kafkaTemplate.send("request-topic", userId, message);
    }
}
 

컨슈머

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class RequestConsumer {

    @KafkaListener(topics = "request-topic", groupId = "my-group")
    public void processRequest(ConsumerRecord<String, String> record) {
        String userId = record.key();
        String message = record.value();

        // 처리 시간이 길어질 수 있는 로직 수행
        // ...

        System.out.println("Processed request for user " + userId);
    }
}

 

대기 큐 및 처리 속도 제어

import org.springframework.stereotype.Component;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class RequestQueue {

    private static final int MAX_CONCURRENT_REQUESTS = 5;
    private BlockingQueue<String> requestQueue = new LinkedBlockingQueue<>(MAX_CONCURRENT_REQUESTS);

    public void addToQueue(String message) {
        try {
            requestQueue.put(message);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 예외 처리
        }
    }

    public String takeFromQueue() throws InterruptedException {
        return requestQueue.take();
    }
}

 

엔드포인트 서비스

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class EndpointService {

    @Autowired
    private RequestProducer requestProducer;

    @Autowired
    private RequestQueue requestQueue;

    public void handleRequest(String userId, String message) {
        // 메시지 프로듀서를 통해 Kafka 토픽에 메시지 전송
        requestProducer.sendRequest(userId, message);

        // 대기 큐에 요청 추가
        requestQueue.addToQueue(message);
    }
}

Kafka를 사용하여 엔드포인트 요청을 메시지로 전송하고, 해당 메시지를 소비하는 컨슈머를 통해 처리 시간이 긴 작업을 수행합니다. 동시에 대기 큐를 사용하여 동시에 처리되는 요청의 수를 제한하고, 대기 큐에 추가된 요청은 나중에 처리될 수 있도록 합니다.

'오늘의 공부 & 기록' 카테고리의 다른 글

[Kubernetes] 쿠버네티스와 마이크로 서비스(Microservice)  (0) 2024.04.14
[MQTT] 에러 해결 과정 정리  (0) 2024.04.03
HTTP 상태코드  (0) 2023.09.30
HTTP API설계 예시  (0) 2023.09.30
HTTP 메서드 활용  (0) 2023.09.21