아빠는 개발자

[java] ThreadPoolTaskExecutor 본문

Java

[java] ThreadPoolTaskExecutor

father6019 2023. 12. 23. 16:06
728x90
반응형

ThreadPoolTaskExecutor

스프링에서 멀티쓰레딩을 편하게 구현 하도록 도와주는 Class

 

스레드를 몇개를 까야 가장 성능이 좋을까..

서버 혹은 로컬 머신에 스팩에 따라 다르겠지만..

 

내 pc 기준으로 하자면 

8core / 16GB / i9

 

아래와 같은 결과를 얻었다. 

 

total time is : 717705

taskExecutor.setCorePoolSize(1); //기본 쓰레드 사이즈

taskExecutor.setMaxPoolSize(5); //최대 쓰레드 사이즈

taskExecutor.setQueueCapacity(10); //Max쓰레드가 동작하는 경우 대기하는 queue 사이즈

 

total time is : 739373

taskExecutor.setCorePoolSize(2); //기본 쓰레드 사이즈

taskExecutor.setMaxPoolSize(5); //최대 쓰레드 사이즈

taskExecutor.setQueueCapacity(10); //Max쓰레드가 동작하는 경우 대기하는 queue 사이즈

 

total time is : 459048

taskExecutor.setCorePoolSize(2); //기본 쓰레드 사이즈

taskExecutor.setMaxPoolSize(10); //최대 쓰레드 사이즈

taskExecutor.setQueueCapacity(10); //Max쓰레드가 동작하는 경우 대기하는 queue 사이즈

 

total time is : 441863

taskExecutor.setCorePoolSize(5); //기본 쓰레드 사이즈

taskExecutor.setMaxPoolSize(10); //최대 쓰레드 사이즈

taskExecutor.setQueueCapacity(10); //Max쓰레드가 동작하는 경우 대기하는 queue 사이즈

Thread Pool 설정

스프링에서 멀티쓰레딩을 편하게 구현 하도록 도와주는 Class

설정파일 

package com.doo.selenium.config;


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Configuration
@EnableAsync(proxyTargetClass = true)
@RequiredArgsConstructor
public class AsyncConfig implements AsyncConfigurer {

    @Bean(name = "executor")
    public Executor threadPoolItemTaskExecutor() {
        return getExecutor();
    }

    private Executor getExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(20); //기본 쓰레드 사이즈
        taskExecutor.setMaxPoolSize(20); //최대 쓰레드 사이즈
        taskExecutor.setQueueCapacity(100); //Max쓰레드가 동작하는 경우 대기하는 queue 사이즈
        taskExecutor.setThreadNamePrefix("executor_");
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true); //Queue 에 남아있는 작업이완료 될 때까지 기다림
        taskExecutor.setAwaitTerminationSeconds(60); //shutdown 최대 60초 대기
        taskExecutor.initialize();

        return taskExecutor;
    }
}

corePoolSize

  • 생성해서 사용할 스레드 풀에 속한 기본 스레드 갯수
  • default : 1

queueCapacity

  • 이벤트 대기 큐 크기
  • default 값은 Integer.MAX_VALUE (약 21억)

maxPoolSize

  • 최대 스레드 갯수
  • default는 Integer.MAX_VALUE (약 21억)

rejectedExecutionHandler

태스크 처리량이 스레드 갯수가 max로 채워지고 queue 대기수를 넘어서는 경우에 RejectExecutionException 이 발생합니다.
다음은 해당 Exception을 다양한 방식으로 처리할수있도록 제공하는 Handler 클래스의 종류입니다.

  • AbortPolicy
    • default
    • RejectExecutionHandler 예외 발생 시킴
  • DiscardOldestPolicy
    • 오래된 작업을 skip
    • 모든 태스크가 반드시 처리될 필요가 없을때 사용
  • DiscardPolicy
    • 처리하려는 작업을 skip
    • 모든 태스크가 반드시 처리될 필요가 없을때 사용
  • CallerRunsPolicy
    • shutdown 상태가 아니라면 요청한 Caller Thread에서 직접 처리함
    • 태스크 유실을 최소화하기 위해선 이 구현체를 사용해야함

 

Process

core 사이즈만큼의 스레드가 일을 처리

기본 스레드가 처리할 수 있는 작업량을 넘어서는 경우 setQueueCapacity 에서 설정한 크기의 LinkedBlockingQueue를 생성하여 대기함 (queueCapacity를 따로 설정하지 않는 경우 Default 크기 만큼 생성).

queue도 꽉찬다면 setMaxPoolSize 에서 설정한 만큼 스레드를 추가 생성합니다.

위의 설정대로라면 기본 20개 스레드에서 처리하다가 작업량을 넘어서는 경우 크기가 100인 queue에서 대기하고 큐 크기보다도 초과한다면 최대 20개까지 스레드를 생성

참고

  • 설정 순서는 기존 스레드 갯수 값 까지 스레드 할당(corePoolSize) → 큐 크기 만큼 태스크 큐에서 대기(queueCapacity) → 최대 스레드 갯수 만큼 스레드 추가(maxPoolSize) 
  • 스레드 초과로 발생하는 Exception을 처리하는 Handler Calss

 

우선 서비스에서 아래와같이 작성하고 

private List<CompletableFuture<Integer>> completableFutures = new ArrayList<>();

private final AsyncTaskService asyncTaskService;

 

for (int i = 0; i < 100; i++) {
    System.out.println("Task Number : " + i );
    CompletableFuture<Integer> completableFuture = asyncTaskService.task((i % MAX_NUMBER), url);
    completableFutures.add(completableFuture);
}

 

for (CompletableFuture<Integer> future : completableFutures) {
    try {
        System.out.println("EEENNNDDDD:::: end ");
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

100회 실행했을때의 결과를 받아 보았다. 

 

selenium 으로 새창이 뜨다보니 귀찮아서 마지막에 확인하려고 Timer Annotation 생성 

 

aop 의존성 부여하고 

implementation 'org.springframework.boot:spring-boot-starter-aop'

 

Timer Annotation 생성

package com.doo.selenium.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Timer {
}

 

Around 로 annotation 이 붙은 메소드의 실행시간을 측정

package com.doo.selenium.aspect;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;


@Aspect
@Component
public class TimerAop {
    @Pointcut("@annotation(com.doo.selenium.annotation.Timer)")
    private void enableTimer() {}

    @Around("enableTimer()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        var stopWatch = new StopWatch();
        stopWatch.start();
        Object result = joinPoint.proceed();
        stopWatch.stop();
        System.out.println("total time is : " + stopWatch.getTotalTimeMillis());
        return result;
    }
}

 

 

728x90
반응형