아빠는 개발자

[es] Shard 구성 변경과 Data node 본문

Elastic/elasticsearch

[es] Shard 구성 변경과 Data node

father6019 2024. 8. 18. 13:04
728x90
반응형

Elasticsearch에서 데이터 노드가 6대일 때, primary shard와 replica의 설정이 성능에 미치는 영향

주된 차이점은 읽기와 쓰기 성능, 그리고 시스템의 가용성 및 복구 능력

 

주어진 시나리오에서 primary shard가 3개이고 replica가 각각 1과 3인 두 설정을 비교해 보잣

1. Primary Shard: 3, Replica: 1

  • 구성:
    • 총 샤드 수 = Primary Shard (3) + Replica Shard (3) = 6개 샤드.
    • 각 노드에 하나의 샤드가 배치될 수 있으므로, 모든 노드가 샤드를 하나씩 가지게 됨
    • 이는 3개의 primary shard에 대해 각각 1개의 replica shard를 가지므로, 6개의 데이터 노드에 분산
  • 성능:
    • 쓰기 성능: 쓰기 작업은 primary shard에서만 처리. replica shard는 primary shard의 쓰기 작업이 완료된 후에 데이터를 복제, 이 설정에서는 쓰기 작업이 비교적 빠르게 수행됨
    • 읽기 성능: 읽기 요청은 primary shard와 replica shard에서 모두 처리할 수 있음
    • 가용성: 하나의 노드가 다운되더라도 여전히 모든 데이터가 1개의 노드에 존재하므로, 가용성은 보장

2. Primary Shard: 3, Replica: 3

  • 구성:
    • 총 샤드 수 = Primary Shard (3) + Replica Shard (9) = 12개 샤드.
    • 데이터 노드가 6개이므로, 각 노드는 2개의 샤드를 보유하게 됨
    • 각 primary shard에 대해 3개의 replica shard가 존재하므로, 데이터의 복제와 가용성이 극대화됨.
  • 성능:
    • 쓰기 성능: primary shard에 데이터가 쓰여질 때, 3개의 replica shard에 데이터를 복제해야 하는데 . replica의 수가 많아질수록 복제하는 데 시간이 더 걸리기 때문에, 쓰기 성능이 저하될 수 있음.
    • 읽기 성능: replica shard가 많기 때문에, 읽기 성능이 크게 향상. 읽기 작업은 primary shard와 3개의 replica shard 모두에서 처리할 수 있음
    • 가용성: 가용성은 매우 높음. 하나의 노드, 혹은 여러 노드가 다운되더라도, 데이터의 복사본이 여전히 여러 노드에 존재하므로 데이터 손실 없이 서비스가 지속될 수 있음.

성능 차이 요약

  1. 쓰기 성능:
    • Primary Shard: 3, Replica: 1 설정이 더 빠릅니다. replica 수가 적어 복제 작업이 적고, 쓰기 지연이 최소화
    • Primary Shard: 3, Replica: 3 설정은 느려집니다. 더 많은 replica에 데이터를 복제해야 하기 때문에 복제 시간이 증가
  2. 읽기 성능:
    • Primary Shard: 3, Replica: 1 설정은 보통 수준의 읽기 성능을 제공
    • Primary Shard: 3, Replica: 3 설정은 더 우수한 읽기 성능을 제공. 많은 replica가 있으므로, 읽기 요청을 더 잘 분산할 수 있음
  3. 가용성 및 내구성:
    • Primary Shard: 3, Replica: 1 설정은 기본적인 가용성을 제공. 한 노드가 장애를 겪어도 서비스가 유지.
    • Primary Shard: 3, Replica: 3 설정은 최고의 가용성을 제공. 여러 노드가 장애를 겪어도 데이터 손실 없이 서비스를 유지할 수 있음

결론

  • Primary Shard: 3, Replica: 1 설정은 빠른 쓰기 성능보통 수준의 읽기 성능을 제공합니다. 가용성은 충분히 보장되지만, 최악의 경우 가용성에는 다소 제약이 있을 수 있습니다.
  • Primary Shard: 3, Replica: 3 설정은 높은 읽기 성능최고의 가용성을 제공하지만, 쓰기 성능은 다소 저하될 수 있습니다.

어떤 설정이 더 적합한지는 시스템의 사용 목적에 따라 다릅니다. 읽기 성능을 극대화하고 가용성을 우선시해야 한다면, replica를 많이 두는 것이 좋습니다. 반면, 쓰기 성능을 중요시한다면 replica 수를 줄이는 것이 더 유리함.

 

  • AS-IS: 6서버 1노드 1샤드 (primary 3, replica 1) 각 데이터 노드별로 1개의 샤드 구성 - 운영과 동일한 구조
  • TO-BE: 6서버 1노드 1샤드 (primary 3, replica 3) 각 데이터 노드별로 2개의 샤드 구성

 

 

  • 데이터 구성
    • As-is: prd data copy index (07월10일 PRD데이터)
    • To-be: prd data copy index (07월10일 PRD데이터)
  • QA api 호출
  • 100건/초
  • 데이터 추출 기준: 2024-03-01 09:00:00 ~ 2024-03-01 09:30:00 까지 5분간격 으로 10,000 개씩 7만개 추출 (중복제거를 하지 않은 request 추출)
  • sleep 0.3초
  • 최초 1회 es cache 초기화 후 실행
  • 7만개 키워드 1회 실행 후 Alias 변경후 테스트 X 2

    Asis Run time :: 2024-08-12 19:04:58 ~ 2024-08-12 22:00:48
    Tobe Run time :: 2024-08-12 22:01:49 ~ 2024-08-13 00:41:46
    1차 완료 
    Asis Run time2 :: 2024-08-13 00:41:47 ~ 2024-08-13 03:18:32
    Tobe Run time2 :: 2024-08-13 03:19:32 ~ 2024-08-13 05:51:31
    2차 완료 

 

 

 

 

import asyncio
import aiohttp
import time
from urllib import parse
import urllib3
import matplotlib.pyplot as plt
import numpy as np
from time import sleep
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
import ssl
import os
#API cache 사용과 응답속도 확인
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

async def fetch(session, url):
    try:
        async with session.get(url, ssl=False, timeout=aiohttp.ClientTimeout(total=600)) as response:
            return await response.text()
    except aiohttp.ClientConnectorError as e:
        print(f"Connection Error: {e}")
    except asyncio.TimeoutError:
        print("Request timed out")

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(fetch(session, url))
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
#     url = "https://totalsearch-api-qa.homeplus.kr"  # 호출하려는 API URL
#     url = "http://localhost:8090"  # localhost
    urls = []

    time_a = []
    arr_node1 = []
    arr_node2 = []
    arr_node3 = []
    arr_node4 = []
    arr_node5 = []
    arr_node6 = []
    arr_req1 = []
    arr_req2 = []
    arr_req3 = []
    arr_req4 = []
    arr_req5 = []
    arr_req6 = []

    with open(CSV_FILE) as data_file:
        count_i = 0
        for line in data_file:
            keyword = parse.quote(line.strip())
            urls.append(HOST + "/home/1.0/total/search?sort=RANK&inputKeyword=" + keyword + "&searchKeyword=" + keyword + "&page=1&perPage=1")
            if (len(urls) % CHUNK == 0):
                start_time = time.time()
                results = await fetch_all(urls)
                end_time = time.time()
                time_a.append((time.time() - start_time ) * 1000)

                node1, node2, node3, node4, node5, node6,req1, req2, req3, req4, req5, req6 = query_cache_monitoring()
                arr_node1.append(node1)
                arr_node2.append(node2)
                arr_node3.append(node3)
                arr_node4.append(node4)
                arr_node5.append(node5)
                arr_node6.append(node6)
                arr_req1.append(req1)
                arr_req2.append(req2)
                arr_req3.append(req3)
                arr_req4.append(req4)
                arr_req5.append(req5)
                arr_req6.append(req6)
                dt_object = datetime.fromtimestamp(end_time)
                print("Shot!!! ::: " + str(count_i) + " ::: "+str(dt_object.isoformat()))
                count_i +=CHUNK
                sleep(0.3)
                urls = []

    t = range(0, len(time_a))
    plt.rcParams['font.family'] = 'AppleGothic'

    fs = 1
    y = time_a

    # Plot the raw time series
    axs = plt.figure(figsize=(12,6) , layout='constrained').subplot_mosaic([
        ['time', 'time', 'time'],
        ['node1', 'node2', 'node3'],
        ['node4', 'node5', 'node6'],
    ])


    axs['time'].plot(t, y, lw=lw)
    axs['time'].set_xlabel(str(len(time_a)) + '회')
    axs['time'].set_ylabel('Time(ms)')

    axs['node1'].plot(t, arr_node1, 'g', lw=lw)
    axs['node1'].plot(t, arr_req1, 'r', lw=lw)
    # axs['node1'].psd(arr_node1, NFFT=len(t), pad_to=len(t), Fs=fs)
    axs['node1'].set_ylabel('Cache')

    axs['node2'].plot(t, arr_node2, 'g', lw=lw)
    axs['node2'].plot(t, arr_req2, 'r', lw=lw)
    # axs['node2'].psd(arr_node2, NFFT=len(t), pad_to=len(t), Fs=fs)
    axs['node2'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node3'].plot(t, arr_node3, 'g', lw=lw)
    axs['node3'].plot(t, arr_req3, 'r', lw=lw)
    # axs['node3'].psd(arr_node3, NFFT=len(t) // 2, pad_to=len(t), noverlap=0, Fs=fs)
    axs['node3'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node4'].plot(t, arr_node4, 'g', lw=lw)
    axs['node4'].plot(t, arr_req4, 'r', lw=lw)
    axs['node4'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node5'].plot(t, arr_node5, 'g', lw=lw)
    axs['node5'].plot(t, arr_req5, 'r', lw=lw)
    axs['node5'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node6'].plot(t, arr_node6, 'g', lw=lw)
    axs['node6'].plot(t, arr_req6, 'r', lw=lw)
    axs['node6'].set_ylabel('')

    axs['node3'].set_title('node3')

    for title, ax in axs.items():
        if title == 'time':
            continue

        ax.set_title(title)
        ax.sharex(axs['node1'])
        ax.sharey(axs['node1'])

#     print(f"Time taken: {end_time - start_time} seconds")
#     print(f"Number of responses: {len(results)}")
def query_cache_monitoring():
    data = client.nodes.stats()
    node1 = data["nodes"]["vGT_Ao0pQoa5fXxCiD9vPQ"]["indices"]
    node2 = data["nodes"]["2b7CiYd8RFCtgA5P3LurIQ"]["indices"]
    node3 = data["nodes"]["T_0Pwn-1STOpEQCThXNmKw"]["indices"]
    node4 = data["nodes"]["TFAxWZkSTKSvgUTZbSFjyw"]["indices"]
    node5 = data["nodes"]["nCuC5PIUTEOqOu5kMzgo0w"]["indices"]
    node6 = data["nodes"]["pWrpoOBsSqO5Nar4sZQnCQ"]["indices"]

    return node1["query_cache"]["memory_size_in_bytes"] / div, node2["query_cache"]["memory_size_in_bytes"] / div, \
           node3["query_cache"]["memory_size_in_bytes"] / div, node4["query_cache"]["memory_size_in_bytes"] / div, node5["query_cache"]["memory_size_in_bytes"] / div, node6["query_cache"]["memory_size_in_bytes"] / div, node1["request_cache"]["memory_size_in_bytes"] / div, node2["request_cache"]["memory_size_in_bytes"] / div, \
           node3["request_cache"]["memory_size_in_bytes"] / div, node4["request_cache"]["memory_size_in_bytes"] / div, node5["request_cache"]["memory_size_in_bytes"] / div, node6["request_cache"]["memory_size_in_bytes"] / div

# 별칭 추가
def add_alias(es, index_name, alias_name):
    es.indices.put_alias(index=index_name, name=alias_name)
    print(f"Alias '{alias_name}' added to index '{index_name}'")

# 별칭 제거
def remove_alias(es, index_name, alias_name):
    es.indices.delete_alias(index=index_name, name=alias_name)
    print(f"Alias '{alias_name}' removed from index '{index_name}'")

def update_alias(client, old_index_name, new_index_name, alias_name):
    remove_alias(client, old_index_name, alias_name)
    add_alias(client, new_index_name, alias_name)
    print(f"Alias '{alias_name}' updated from index '{old_index_name}' to '{new_index_name}'")

if __name__ == "__main__":

    sleep(600)
    now = datetime.now()
    date_view = now.strftime("%Y%m%d")
    f_v = open("shard_test_"+str(date_view)+".txt",'w')

    HYPER_OLD_INDEX_NAME = "prd-hyper-item-20240710"
    DS_OLD_INDEX_NAME = "prd-ds-item-20240710"
    EXP_OLD_INDEX_NAME = "local-prd-exp-item"

#     HYPER_ALIAS_NAME = "prd-hyper-item"
#     DS_ALIAS_NAME = "prd-ds-item"
#     EXP_ALIAS_NAME = "prd-exp-item"

    HYPER_ALIAS_NAME = "hyper-item"
    DS_ALIAS_NAME = "ds-item"
    EXP_ALIAS_NAME = "exp-item"

    client = Elasticsearch("https://doo:doo!@total-qa.ddong.kr:443/", ca_certs=False,
                           verify_certs=False)
    client.indices.clear_cache()
    div = 1000000
    lw = 0.7
    y = []
    plt.rcParams['font.family'] = 'AppleGothic'

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    CHUNK = 100
#     HOST = "http://localhost:8090"  # localhost
    HOST = "https://totalapi-qa.ddong.kr"

#     directory_path = 'start'
    directory_path = 'event3'

    for i in range(0,2):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Asis Run time :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Asis Run time :: " + start_time_view + " ~ " + end_time_view+ "\n")

    HYPER_NEW_INDEX_NAME = "shard3-hyper-item-20240802"
    DS_NEW_INDEX_NAME = "shard3-ds-item-20240802"
    EXP_NEW_INDEX_NAME = "shard3-exp-item-20240807"

    update_alias(client, HYPER_OLD_INDEX_NAME, HYPER_NEW_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_OLD_INDEX_NAME, DS_NEW_INDEX_NAME, DS_ALIAS_NAME)
#     update_alias(client, EXP_OLD_INDEX_NAME, EXP_NEW_INDEX_NAME, EXP_ALIAS_NAME)

    sleep(60)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,2):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Tobe Run time :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Tobe Run time :: " + start_time_view + " ~ " + end_time_view+ "\n")
    f_v.write("1차 완료 \n")

#원복
    update_alias(client, HYPER_NEW_INDEX_NAME, HYPER_OLD_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_NEW_INDEX_NAME, DS_OLD_INDEX_NAME, DS_ALIAS_NAME)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,2):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Asis Run time2 :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Asis Run time2 :: " + start_time_view + " ~ " + end_time_view+ "\n")

    update_alias(client, HYPER_OLD_INDEX_NAME, HYPER_NEW_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_OLD_INDEX_NAME, DS_NEW_INDEX_NAME, DS_ALIAS_NAME)
#     update_alias(client, EXP_OLD_INDEX_NAME, EXP_NEW_INDEX_NAME, EXP_ALIAS_NAME)

    sleep(60)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,2):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Tobe Run time2 :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Tobe Run time2 :: " + start_time_view + " ~ " + end_time_view+ "\n")

    update_alias(client, HYPER_NEW_INDEX_NAME, HYPER_OLD_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_NEW_INDEX_NAME, DS_OLD_INDEX_NAME, DS_ALIAS_NAME)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,2):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Asis Run time3 :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Asis Run time3 :: " + start_time_view + " ~ " + end_time_view+ "\n")

    update_alias(client, HYPER_OLD_INDEX_NAME, HYPER_NEW_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_OLD_INDEX_NAME, DS_NEW_INDEX_NAME, DS_ALIAS_NAME)
#     update_alias(client, EXP_OLD_INDEX_NAME, EXP_NEW_INDEX_NAME, EXP_ALIAS_NAME)
    sleep(60)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,2):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Tobe Run time3 :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Tobe Run time3 :: " + start_time_view + " ~ " + end_time_view+ "\n")


############################
    f_v.close()

#matplotlib chart show
#     plt.show()

 

 

import asyncio
import aiohttp
import time
from urllib import parse
import urllib3
import matplotlib.pyplot as plt
import numpy as np
from time import sleep
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
import ssl
import os
#API cache 사용과 응답속도 확인
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

async def fetch(session, url):
    try:
        async with session.get(url, ssl=False, timeout=aiohttp.ClientTimeout(total=600)) as response:
            return await response.text()
    except aiohttp.ClientConnectorError as e:
        print(f"Connection Error: {e}")
    except asyncio.TimeoutError:
        print("Request timed out")

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(fetch(session, url))
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
#     url = "https://totalsearch-api-qa.homeplus.kr"  # 호출하려는 API URL
#     url = "http://localhost:8090"  # localhost
    urls = []

    time_a = []
    arr_node1 = []
    arr_node2 = []
    arr_node3 = []
    arr_node4 = []
    arr_node5 = []
    arr_node6 = []
    arr_req1 = []
    arr_req2 = []
    arr_req3 = []
    arr_req4 = []
    arr_req5 = []
    arr_req6 = []

    with open(CSV_FILE) as data_file:
        count_i = 0
        for line in data_file:
            keyword = parse.quote(line.strip())
            urls.append(HOST + "/home/1.0/total/search?sort=RANK&inputKeyword=" + keyword + "&searchKeyword=" + keyword + "&page=1&perPage=1")
            if (len(urls) % CHUNK == 0):
                start_time = time.time()
                results = await fetch_all(urls)
                end_time = time.time()
                time_a.append((time.time() - start_time ) * 1000)

                node1, node2, node3, node4, node5, node6,req1, req2, req3, req4, req5, req6 = query_cache_monitoring()
                arr_node1.append(node1)
                arr_node2.append(node2)
                arr_node3.append(node3)
                arr_node4.append(node4)
                arr_node5.append(node5)
                arr_node6.append(node6)
                arr_req1.append(req1)
                arr_req2.append(req2)
                arr_req3.append(req3)
                arr_req4.append(req4)
                arr_req5.append(req5)
                arr_req6.append(req6)
                dt_object = datetime.fromtimestamp(end_time)
                print("Shot!!! ::: " + str(count_i) + " ::: "+str(dt_object.isoformat()))
                count_i +=CHUNK
                sleep(0.3)
                urls = []

    t = range(0, len(time_a))
    plt.rcParams['font.family'] = 'AppleGothic'

    fs = 1
    y = time_a

    # Plot the raw time series
    axs = plt.figure(figsize=(12,6) , layout='constrained').subplot_mosaic([
        ['time', 'time', 'time'],
        ['node1', 'node2', 'node3'],
        ['node4', 'node5', 'node6'],
    ])


    axs['time'].plot(t, y, lw=lw)
    axs['time'].set_xlabel(str(len(time_a)) + '회')
    axs['time'].set_ylabel('Time(ms)')

    axs['node1'].plot(t, arr_node1, 'g', lw=lw)
    axs['node1'].plot(t, arr_req1, 'r', lw=lw)
    # axs['node1'].psd(arr_node1, NFFT=len(t), pad_to=len(t), Fs=fs)
    axs['node1'].set_ylabel('Cache')

    axs['node2'].plot(t, arr_node2, 'g', lw=lw)
    axs['node2'].plot(t, arr_req2, 'r', lw=lw)
    # axs['node2'].psd(arr_node2, NFFT=len(t), pad_to=len(t), Fs=fs)
    axs['node2'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node3'].plot(t, arr_node3, 'g', lw=lw)
    axs['node3'].plot(t, arr_req3, 'r', lw=lw)
    # axs['node3'].psd(arr_node3, NFFT=len(t) // 2, pad_to=len(t), noverlap=0, Fs=fs)
    axs['node3'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node4'].plot(t, arr_node4, 'g', lw=lw)
    axs['node4'].plot(t, arr_req4, 'r', lw=lw)
    axs['node4'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node5'].plot(t, arr_node5, 'g', lw=lw)
    axs['node5'].plot(t, arr_req5, 'r', lw=lw)
    axs['node5'].set_ylabel('')

    # Plot the PSD with different amounts of overlap between blocks
    axs['node6'].plot(t, arr_node6, 'g', lw=lw)
    axs['node6'].plot(t, arr_req6, 'r', lw=lw)
    axs['node6'].set_ylabel('')

    axs['node3'].set_title('node3')

    for title, ax in axs.items():
        if title == 'time':
            continue

        ax.set_title(title)
        ax.sharex(axs['node1'])
        ax.sharey(axs['node1'])

#     print(f"Time taken: {end_time - start_time} seconds")
#     print(f"Number of responses: {len(results)}")
def query_cache_monitoring():
    data = client.nodes.stats()
    node1 = data["nodes"]["vGT_Ao0pQoa5fXxCiD9vPQ"]["indices"]
    node2 = data["nodes"]["2b7CiYd8RFCtgA5P3LurIQ"]["indices"]
    node3 = data["nodes"]["T_0Pwn-1STOpEQCThXNmKw"]["indices"]
    node4 = data["nodes"]["TFAxWZkSTKSvgUTZbSFjyw"]["indices"]
    node5 = data["nodes"]["nCuC5PIUTEOqOu5kMzgo0w"]["indices"]
    node6 = data["nodes"]["pWrpoOBsSqO5Nar4sZQnCQ"]["indices"]

    return node1["query_cache"]["memory_size_in_bytes"] / div, node2["query_cache"]["memory_size_in_bytes"] / div, \
           node3["query_cache"]["memory_size_in_bytes"] / div, node4["query_cache"]["memory_size_in_bytes"] / div, node5["query_cache"]["memory_size_in_bytes"] / div, node6["query_cache"]["memory_size_in_bytes"] / div, node1["request_cache"]["memory_size_in_bytes"] / div, node2["request_cache"]["memory_size_in_bytes"] / div, \
           node3["request_cache"]["memory_size_in_bytes"] / div, node4["request_cache"]["memory_size_in_bytes"] / div, node5["request_cache"]["memory_size_in_bytes"] / div, node6["request_cache"]["memory_size_in_bytes"] / div

# 별칭 추가
def add_alias(es, index_name, alias_name):
    es.indices.put_alias(index=index_name, name=alias_name)
    print(f"Alias '{alias_name}' added to index '{index_name}'")

# 별칭 제거
def remove_alias(es, index_name, alias_name):
    es.indices.delete_alias(index=index_name, name=alias_name)
    print(f"Alias '{alias_name}' removed from index '{index_name}'")

def update_alias(client, old_index_name, new_index_name, alias_name):
    remove_alias(client, old_index_name, alias_name)
    add_alias(client, new_index_name, alias_name)
    print(f"Alias '{alias_name}' updated from index '{old_index_name}' to '{new_index_name}'")

if __name__ == "__main__":

    sleep(600)

    now = datetime.now()
    date_view = now.strftime("%Y%m%d")
    f_v = open("shard_test_"+str(date_view)+".txt",'w')

    HYPER_OLD_INDEX_NAME = "prd-hyper-item-20240710"
    DS_OLD_INDEX_NAME = "prd-ds-item-20240710"
    EXP_OLD_INDEX_NAME = "local-prd-exp-item"

    HYPER_NEW_INDEX_NAME = "shard3-hyper-item-20240802"
    DS_NEW_INDEX_NAME = "shard3-ds-item-20240802"
    EXP_NEW_INDEX_NAME = "shard3-exp-item-20240807"

#     HYPER_ALIAS_NAME = "prd-hyper-item"
#     DS_ALIAS_NAME = "prd-ds-item"
#     EXP_ALIAS_NAME = "prd-exp-item"

    HYPER_ALIAS_NAME = "hyper-item"
    DS_ALIAS_NAME = "ds-item"
    EXP_ALIAS_NAME = "exp-item"

    client = Elasticsearch("https://elastic:elastic1!@totalsearch-es-qa.homeplus.kr:443/", ca_certs=False,
                           verify_certs=False)
    client.indices.clear_cache()
    div = 1000000
    lw = 0.7
    y = []
    plt.rcParams['font.family'] = 'AppleGothic'

    CHUNK = 100
#     HOST = "http://localhost:8090"  # localhost
    HOST = "https://totalsearch-co.kr"
#     directory_path = 'start'
    directory_path = 'event5'

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")


    for i in range(0,1):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
        sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Asis Run time :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Asis Run time :: " + start_time_view + " ~ " + end_time_view+ "\n")

#Alias 변경
    update_alias(client, HYPER_OLD_INDEX_NAME, HYPER_NEW_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_OLD_INDEX_NAME, DS_NEW_INDEX_NAME, DS_ALIAS_NAME)
#     update_alias(client, EXP_OLD_INDEX_NAME, EXP_NEW_INDEX_NAME, EXP_ALIAS_NAME)

    sleep(60)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,1):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Tobe Run time :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Tobe Run time :: " + start_time_view + " ~ " + end_time_view+ "\n")
    print("==================================================================================")
    sleep(600)

#원복 Alias 변경
    update_alias(client, HYPER_NEW_INDEX_NAME, HYPER_OLD_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_NEW_INDEX_NAME, DS_OLD_INDEX_NAME, DS_ALIAS_NAME)
    sleep(60)

    for i in range(0,1):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Asis2 Run time :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Asis2 Run time :: " + start_time_view + " ~ " + end_time_view+ "\n")
    sleep(600)

    update_alias(client, HYPER_OLD_INDEX_NAME, HYPER_NEW_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_OLD_INDEX_NAME, DS_NEW_INDEX_NAME, DS_ALIAS_NAME)
    sleep(60)

    now = datetime.now()
    start_time_view = now.strftime("%Y-%m-%d %H:%M:%S")

    for i in range(0,1):
        client.indices.clear_cache()
        file_names = os.listdir(directory_path)
        for file_name in file_names:
            print(file_name)
            CSV_FILE = directory_path+"/"+file_name
            asyncio.run(main())
#         sleep(600)

    now = datetime.now()
    end_time_view = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Tobe2 Run time :: " + start_time_view + " ~ " + end_time_view)
    f_v.write("Tobe2 Run time :: " + start_time_view + " ~ " + end_time_view+ "\n")

############################
    f_v.close()

#원복 Alias 변경
    update_alias(client, HYPER_NEW_INDEX_NAME, HYPER_OLD_INDEX_NAME, HYPER_ALIAS_NAME)
    update_alias(client, DS_NEW_INDEX_NAME, DS_OLD_INDEX_NAME, DS_ALIAS_NAME)


#matplotlib chart show
#     plt.show()



728x90
반응형