아빠는 개발자

[java] 멀티스레드 최적화 (java application batch) - 테스트 편 본문

Java

[java] 멀티스레드 최적화 (java application batch) - 테스트 편

father6019 2024. 9. 22. 01:07
728x90
반응형

스레드 최적화 테스트를 해보자

일단 내 mac 은 8core 16GB

 

data 는 38,774 건

 

데이터 전체 카운트를 구해서 paging 으로 나눠서 병렬 처리 할껀데.. order by 가 들어가서 테스트에서만 사용하고 실전에선.. 비추

 

runner 에서 실행하는 메소드

package com.doo.aqqle.service;

import com.doo.aqqle.annotation.Timer;
import com.doo.aqqle.repository.StockDataRepository;
import com.doo.aqqle.repository.StockRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Slf4j
@Service("YahooDataService")
public class YahooDataService extends ExtractService implements AqqleExtract {

    private final YahooDataTaskService yahooDataTaskService;
    private List<CompletableFuture<Integer>> completableFutures = new ArrayList<>();

    public YahooDataService(StockRepository stockRepository, StockDataRepository stockDataRepository, YahooDataTaskService yahooDataTaskService) {
        super(stockRepository, stockDataRepository);
        this.yahooDataTaskService = yahooDataTaskService;
    }

    @Override
    @Timer
    public void execute() {

        String type = "yahoo";
        String extractPath = "/data/" + type + "/static/";

        File file = new File(extractPath);

        if (file.exists() && file.isDirectory()) {
            try {
                FileUtils.cleanDirectory(file);
            } catch (IOException e) {
                e.getStackTrace();
            }

        }

        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
        String directory = extractPath + LocalDateTime.now().format(dateTimeFormatter).toString();

        System.out.println(directory);
        IndexFileService.createDirectory(directory);

        long count = stockDataRepository.count();
        int chunk = 500;
        double endPage = Math.ceil(count / chunk);

        for (int i = 0; i < endPage + 1; i++) {
            CompletableFuture<Integer> completableFuture = yahooDataTaskService.task(i, chunk, directory);
            completableFutures.add(completableFuture);
        }

        for (CompletableFuture<Integer> future : completableFutures) {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }


    }
}

 

데이터를 조회해서 파일로 기록하는 부분 

package com.doo.aqqle.service;

import com.doo.aqqle.dto.StockDataDTO;
import com.doo.aqqle.dto.StockDataListDTO;
import com.doo.aqqle.repository.StockData;
import com.doo.aqqle.repository.StockDataRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Service("YahooDataAsyncService")
@RequiredArgsConstructor
public class YahooDataTaskService {

    private final StockDataRepository stockDataRepository;
    @Async("executor")
    public CompletableFuture<Integer> task(int i, int chunk, String directory) {
        PageRequest pageRequest = PageRequest.of(i, chunk);
        Page<StockData> stockDatas = stockDataRepository.findAllByOrderByIdAsc(pageRequest);
        List<StockDataDTO> stockDataDTOList = new ArrayList<>();

        try {
            stockDatas.stream().forEach(
                    x -> {
                        stockDataDTOList.add(
                                StockDataDTO.builder()
                                        .company(x.getCompany())
                                        .companyCode(x.getCompanyCode())
                                        .tradingDate(x.getTradingDate())
                                        .open(x.getOpen())
                                        .high(x.getHigh())
                                        .low(x.getLow())
                                        .close(x.getClose())
                                        .adjClose(x.getAdjClose())
                                        .volume(x.getVolume())
                                        .build()
                        );
                    }
            );

            ObjectMapper objectMapper = new ObjectMapper();

            StockDataListDTO stockDataListDTO = new StockDataListDTO();
            stockDataListDTO.setStockDataDTOList(stockDataDTOList);

            String goodsJson = objectMapper.writeValueAsString(stockDataListDTO);
            IndexFileService.createFile(directory + "/indextime_" + i + ".txt", goodsJson);

        } catch (IOException e) {
            e.printStackTrace();
        }
        log.info("TASK");
        return CompletableFuture.supplyAsync(stockDatas::getSize);
    }
}

 

8core 16GB에서 가장 성능이 좋은 구성을 찾아보자

장비스팩은 위와 같지만 실제 가용할수 있는 리소스의 상태는 저건 아닐 듯..

회차  스레드 구성  조회건수 실행시간
1
Thread count : 2
Max thread count:2
Queue capacity:15
500건  2599 ms
2502 ms
2 200건  4820 ms
3 Thread count : 2
Max thread count:3
Queue capacity:15
200건  3842 ms4
4 Thread count : 2
Max thread count:3
Queue capacity:10


200건  4064 ms

5 500건 2113 ms
6 Thread count : 2
Max thread count:3
Queue capacity:10
1000건 1468 ms
7 Thread count : 2
Max thread count:3
Queue capacity:10
2000건 1078 ms
8 Thread count : 2
Max thread count:3
Queue capacity:10
10000건 1094 ms

 

조회건수를 고정하고 테스트 해야겠다. 조회건수의 영향을 너무 심하게 받는다.  조회건수 200에서 최적화된 스레드 구성을 찾아보잣!

 

회차 스레드구성 조회건수 실행시간
1 Thread count : 2
Max thread count:3
Queue capacity:10
200 3846 ms
2 Thread count : 2
Max thread count:4
Queue capacity:10
200 3342 ms
3 Thread count : 2
Max thread count:5
Queue capacity:10
200 3062 ms
4 Thread count : 2
Max thread count:7
Queue capacity:10
200 3151 ms
5 Thread count : 2
Max thread count:6
Queue capacity:10
200 3076 ms
6 Thread count : 2
Max thread count:6
Queue capacity:6
200 2825 ms

 

음.. 

일단 다시 처음으로 돌아가 조회건 수 를 500으로 했을때 

Thread count : 2
Max thread count:6
Queue capacity:6

대충 이정도가 가장 빠른 처리를 했다.

 

실행 시간: 1513 ms

 

리소스 사용량이 궁금한데..

 

 

 

 

728x90
반응형