본문 바로가기

취미

AWS DynamoDB, Lambda를 이용한 auto scaling #2

지난 번 포스팅에 이어서 이번에는 프로비저닝유닛을 감소하는 코드를 짜보도록 하겠다.


참 치사하다고 생각되는게 -0- 

DynamoDB에서 유닛의 증가에는 제한이 없지만 

유닛을 감소하는 것에 대해서는 하루에 4번이라는 제한이 있다.

그래서 요청에 제일 많은 시간 이 후에 실행되게 하는게 제일 좋을 것 같다.

예제에서는 24시간을 4로 나눠서 6시간마다 실행되게 해보았다.

유닛을 감소하는 방식은 존재하는 테이블과 테이블에 있는 GlobalSecondaryIndex를 전부 가져와서 프로비저닝된 유닛과 사용중인 유닛을 비교해서 유닛을 조정한다.


blueprint는 비어있는 함수를 선택하고 넥스트넥스트 

새로운 Lambda함수를 추가한다.


먼저 boto3, math, datetime 라이브러리를 임포트 하고 알람이 새로 설정될때 설정할 알람 임계값, 그다음 provision_percent에 Unit을 감소할경우 현재 사용중인 Unit count에 30% 큰 수로 유닛을 설정하도록 변수를 잡았다.

최소 읽기 유닛 5, 최소 쓰기유닛 1로 설정했다.



lambda_handler에서 cloudwatch객체와 dynamodb객체를 만들어서 전체 테이블 리스트를 불러와 check_unit함수에 넣는다.

 


 그리고 프로비저닝된 유닛과 사용중인 유닛을 가져와 비교한다.

cloudwatch에서 프로비저닝유닛을 가져오면 가끔씩 제대로 값을 못가져올때가 있었다; 그래서 table객체에서 프로비저닝유닛을 가져오고 cloudwatch에서 사용중인 값을 가져오고 값이 없을땐 0을 넣는다.

 

그리고 값이 변경이 필요할 경우에 update_unit에 값을 넣고 테이블 유닛값을 변경한다.

 

이후에는 마찬가지로 알람의 임계값을 조절하면 끝.

 

이후 cloudwatch에서 새로운 create rule을 눌러 특정 시간에 또는 특정시간마다 스케줄을 만들고 lambda함수를 연결하면 된다.

유닛감소는 하루 4번 제한이 있기 때문에 최적의 시간대에 정하는게 중요할 것 같다.

 

아래는 전체 코드

 

DynamoDB Scale Down

import boto3
import math
from datetime import datetime, timedelta

alarm_percent = 80
provision_percent = 30
minimum_units_read = 5
minimum_units_write = 1


def check_units(watch, table, second_index=None):
    data = {}
    data['ProvisionedReadCapacityUnits'] = table.provisioned_throughput['ReadCapacityUnits']
    data['ProvisionedWriteCapacityUnits'] = table.provisioned_throughput['WriteCapacityUnits']
    
    update_unit = {}
    Dimensions = [{"Name": "TableName", "Value": table.name}]
    if second_index:
        Dimensions.append({"Name": "GlobalSecondaryIndexName", "Value": second_index})
    
    for metric in ['ConsumedReadCapacityUnits', 'ConsumedWriteCapacityUnits']:
        data[metric] = watch.get_metric_statistics(Namespace="AWS/DynamoDB", MetricName=metric,
                                                   Dimensions=Dimensions,
                                                   StartTime=datetime.utcnow()-timedelta(minutes=5), EndTime=datetime.utcnow(),
                                                   Period=300, Statistics=["Average"], Unit="Count")
        if not data[metric]['Datapoints']:
            data[metric] = 0
        else:
            data[metric] = int(data[metric]['Datapoints'][0]['Average'])

    if data['ProvisionedReadCapacityUnits'] > data['ConsumedReadCapacityUnits']:
        if data['ConsumedReadCapacityUnits'] == 0:
            if data['ProvisionedReadCapacityUnits'] != minimum_units_read:
                update_unit['read'] = minimum_units_read
        elif data['ConsumedReadCapacityUnits']/float(data['ProvisionedReadCapacityUnits']) * 100 > 30:
            update_unit['read'] = math.ceil(data['ConsumedReadCapacityUnits'] * (100.0+provision_percent) / 100)

    if data['ProvisionedWriteCapacityUnits'] > data['ConsumedWriteCapacityUnits']:
        if data['ConsumedWriteCapacityUnits'] == 0:
            if data['ProvisionedWriteCapacityUnits'] != minimum_units_write:
                update_unit['write'] = minimum_units_write
        elif data['ConsumedReadCapacityUnits']/float(data['ProvisionedReadCapacityUnits']) * 100 > 30:
            update_unit['write'] = math.ceil(data['ConsumedReadCapacityUnits'] * 130 / 100.0)

    # modify throughput
    if update_unit:
        try:
            kwargs = dict()
            if not second_index:
                kwargs['ProvisionedThroughput'] = \
                        {"ReadCapacityUnits": \
                         int(update_unit['read']) if 'read' in update_unit.keys() \
                         else data['ProvisionedReadCapacityUnits'],
                         "WriteCapacityUnits":\
                         int(update_unit['write']) if 'write' in update_unit.keys() \
                         else data['ProvisionedWriteCapacityUnits']}
            else:
                kwargs['GlobalSecondaryIndexUpdates'] = \
                        [{"Update": {"IndexName": second_index,
                                     "ProvisionedThroughput": \
                                     {"ReadCapacityUnits": update_unit['read'] if 'read' in update_unit.keys() \
                                      else data['ProvisionedReadCapacityUnits'],
                                      "WriteCapacityUnits": update_unit['write'] if update_unit.has_key("write") \
                                      else data['ProvisionedWriteCapacityUnits']}}}]

            table.update(**kwargs)
        except Exception as e:
            print e
            pass
        else:
            print 'update throughput'
            for i, v in update_unit.items():
                if i == 'read':
                    modify_alarm(watch, Dimensions, 'ConsumedReadCapacityUnits', v)
                else:
                    modify_alarm(watch, Dimensions, 'ConsumedWriteCapacityUnits', v)


def modify_alarm(watch, Dimensions, metric, threshold):
    # modify CloudWatch Alarm
    alarm = watch.describe_alarms_for_metric(MetricName=metric, Namespace="AWS/DynamoDB",
                                             Dimensions=Dimensions)
    if not alarm['ResponseMetadata']['HTTPStatusCode'] in [200, 201]:
        return False

    alarm = alarm['MetricAlarms'][0]
    threshold = threshold * (alarm_percent/100.0) * alarm['Period']
    print threshold
    try:
        watch.put_metric_alarm(AlarmName=alarm['AlarmName'],
                               ActionsEnabled=alarm['ActionsEnabled'],
                               EvaluationPeriods=alarm['EvaluationPeriods'],
                               OKActions=alarm['OKActions'], AlarmActions=alarm['AlarmActions'],
                               InsufficientDataActions=alarm['InsufficientDataActions'],
                               MetricName=alarm['MetricName'], Namespace=alarm['Namespace'],
                               Statistic=alarm['Statistic'], Dimensions=alarm['Dimensions'],
                               Period=alarm['Period'], ComparisonOperator=alarm['ComparisonOperator'],
                               Threshold=threshold)
    except Exception as e:
        print "Except in " + e
        return False
    print 'update alarm'


def lambda_handler(event, context):
    watch = boto3.client('cloudwatch', aws_access_key_id="",
                         aws_secret_access_key="",
                         region_name="ap-northeast-2")
    dynamodb = boto3.resource('dynamodb', aws_access_key_id="",
                              aws_secret_access_key="",
                              region_name="ap-northeast-2")

    for table in list(dynamodb.tables.all()):
        check_units(watch, table)
        for second in table.global_secondary_indexes if table.global_secondary_indexes else []:
            check_units(watch, table, second_index=second['IndexName'])

 

 

DynamoDB Scale Up

import json
import boto3
import math

increase_percent = 30
alarm_threshold = 80

def update_table(Table, Throughput, Second_index=None):
    kwargs = dict()
    if not Second_index:
        kwargs['ProvisionedThroughput'] = \
                {"ReadCapacityUnits":int(Throughput["ReadCapacityUnits"]),
                "WriteCapacityUnits":int(Throughput["WriteCapacityUnits"])}
    else:
        kwargs['GlobalSecondaryIndexUpdates'] = \
                [{"Update": {"IndexName": Second_index,
                             "ProvisionedThroughput": \
                             {"ReadCapacityUnits": int(Throughput['ReadCapacityUnits']),
                             "WriteCapacityUnits": int(Throughput['WriteCapacityUnits'])}}}]
    Table.update(**kwargs)


def modify_alarm(watch, Dimensions, metric, threshold):
    # modify CloudWatch Alarm
    for dimension in Dimensions:
        dimension['Name'] = dimension.pop('name')
        dimension['Value'] = dimension.pop('value')
    alarm = watch.describe_alarms_for_metric(MetricName=metric, Namespace="AWS/DynamoDB",
                                             Dimensions=Dimensions)
    
    if not alarm['ResponseMetadata']['HTTPStatusCode'] in [200, 201]:
        return False

    alarm = alarm['MetricAlarms'][0]
    threshold = threshold * (alarm_threshold/100.0) * alarm['Period']
    try:
        watch.put_metric_alarm(AlarmName=alarm['AlarmName'],
                               ActionsEnabled=alarm['ActionsEnabled'],
                               EvaluationPeriods=alarm['EvaluationPeriods'],
                               OKActions=alarm['OKActions'], AlarmActions=alarm['AlarmActions'],
                               InsufficientDataActions=alarm['InsufficientDataActions'],
                               MetricName=alarm['MetricName'], Namespace=alarm['Namespace'],
                               Statistic=alarm['Statistic'], Dimensions=alarm['Dimensions'],
                               Period=alarm['Period'], ComparisonOperator=alarm['ComparisonOperator'],
                               Threshold=threshold)
    except Exception as e:
        print "Alarm except : " + str(e)
        return False


def lambda_handler(event, context):
    watch = boto3.client('cloudwatch', aws_access_key_id="",
                         aws_secret_access_key="",
                         region_name="ap-northeast-2")
    dynamodb = boto3.resource('dynamodb', aws_access_key_id="",
                              aws_secret_access_key="",
                              region_name="ap-northeast-2")
    
    message = json.loads(event['Records'][0]['Sns']['Message'])
    
    for dimension in message['Trigger']['Dimensions']:
        if dimension['name'] == 'TableName':
            table = dynamodb.Table(dimension['value'])
            break
    

    metricname = message['Trigger']['MetricName'].replace('Consumed', '')
    try:
        secondindex_name = None
        for dimension in message['Trigger']['Dimensions']:
            if dimension['name'] == 'GlobalSecondaryIndexName':
                secondindex_name = dimension['value']
                break
        if not secondindex_name:
            throughput = table.provisioned_throughput
            throughput[metricname] = math.ceil(throughput[metricname] * (100.0+increase_percent) / 100 )
        else:
            for index_table in table.global_secondary_indexes:
                if index_table['IndexName'] == secondindex_name:
                    throughput = index_table['ProvisionedThroughput']
                    throughput[metricname] = math.ceil(throughput[metricname] * (100.0+increase_percent) / 100 )
                    break
        update_table(table, throughput, secondindex_name)
    except Exception as e:
        print "Update Except : " + str(e)
        return False

    
    modify_alarm(watch, message['Trigger']['Dimensions'], message['Trigger']['MetricName'], throughput[metricname])
        
 

 

 

'취미' 카테고리의 다른 글

계양산 등산  (0) 2019.08.07
JWT(JSON Web Token)  (0) 2017.02.20
AWS DynamoDB, Lambda를 이용한 auto scaling #1  (0) 2017.02.19
아이폰 MAC address 랜덤화  (3) 2016.04.09
802.11 Management Frame Format  (0) 2016.02.15