批量修改亚马逊AWS相关service访问控制和安全组白名单IP设置Python脚本

目前工作中需要维护包括亚马逊AWS、阿里云Alibaba Cloud和微软云Microsoft Azure在内的云平台服务设施。关于白名单组IP地址切换(旧的IP换成新的IP)的工作是分云平台进行的,刚开始独立写每个云平台的脚本,后来因为常用就整合在了一起。参考GitHub上的完整代码:https://github.com/Bilery-Zoo/Cloud_Platform_Maintenance/tree/master/CloudPlatform_WhitelistIP_Switcher。里面集成了AWS和阿里云的实现。本来计划也加进微软Azure的部分,但由于工作安排被搁浅了。其中阿里云单独的部分,可以参考较早的博文:https://blog.csdn.net/sweeper_freedoman/article/details/100153531。建议还是参考GitHub上的集成版本。

本篇博文主要记录一下单独AWS的部分。脚本工作原理和阿里云的脚本实现一致,都是分两步来:

1.向配置了旧的公网IP的白名单里面添加新的公网IP
2.从添加了新的公网IP的白名单里面删除旧的公网IP

文件结构和阿里云的脚本设计也是一致的。这样在后期集成时非常方便。

├── functions.py

        base function of AWS handlers module
├── log.py

        log logging module
├── new_ip_append.py

        new IP appending call file
└── old_ip_delete.py

        old IP deleting call file

functions.py

#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-09-04
program       : *_* base functions of AWS handlers module *_*
"""import re
import json
import typingimport boto3
import botocoreimport loglogger = log.LOG().logger()class AWS(object):def __init__(self, region_name="ap-northeast-1", ):"""Init construct.:param region_name: AWS region name."""self.AWS_Access_Key = {"aws_access_key_id": "","aws_secret_access_key": "", }self.region_name = region_namedef __repr__(self):repr_str = "AWS client built under:\n\t{'Region': %s}" % self.region_namelogger.info(repr_str)return repr_str@staticmethoddef get_ip_pattern(ip):"""Get strictly `re` matching compile pattern of IP.:param ip: IP to get pattern.:return: `re` compile pattern."""return re.compile(ip.replace('.', '[.]'))@log.log(logger=logger, if_exit=True)def get_service_client(self, service, ):"""Get AWS service client. See also:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html.:param service: AWS service name.:return: AWS service client object."""return boto3.client(service, region_name=self.region_name, **self.AWS_Access_Key)@log.log(logger=logger, if_exit=True)def get_ec2_whitelistip_info(self, ip, ) -> typing.Generator:"""Get WhitelistIP info of EC2 / VPC Security Group.:param ip: IP to get info.:return: Dict, {"GroupId": ..., }."""client = self.get_service_client("ec2")response = client.describe_security_groups(Filters=[{"Name": "ip-permission.cidr","Values": ["{ip}/32".format(ip=ip),]},],)for detail in response["SecurityGroups"]:for cidr_ip in detail["IpPermissions"][0]["IpRanges"]:if re.search(self.get_ip_pattern(ip), cidr_ip["CidrIp"]):detail["IpPermissions"][0]["IpRanges"] = [cidr_ip]breakwhitelistip_info = {"GroupId": detail["GroupId"], "IpPermissions": detail["IpPermissions"], }logger.info("get_ec2_ip_info:\n\t{whitelistip_info}".format(whitelistip_info=whitelistip_info))yield whitelistip_info@log.log(logger=logger, if_exit=True)def get_waf_whitelistip_info(self, ip, ) -> typing.Generator:"""Get WhitelistIP info of WAF IP addresses.:param ip: IP to get info.:return: Dict, {"IPSetId": ..., }."""client = self.get_service_client("waf")response = client.list_ip_sets()for detail in response["IPSets"]:ip_sets = client.get_ip_set(IPSetId=detail["IPSetId"])if re.search(self.get_ip_pattern(ip), str(ip_sets)):for ip_set in ip_sets["IPSet"]["IPSetDescriptors"]:if re.search(self.get_ip_pattern(ip), ip_set["Value"]):whitelistip_info = {"IPSetId": ip_sets["IPSet"]["IPSetId"], "IPSetDescriptor": ip_set}logger.info("get_waf_whitelistip_info:\n\t{whitelistip_info}".format(whitelistip_info=whitelistip_info))whitelistip_info["duplicate_check"] = ip_sets["IPSet"]["IPSetDescriptors"]yield whitelistip_infobreak@log.log(logger=logger, if_exit=True)def get_s3_whitelistip_info(self, ip, ) -> typing.Generator:"""Get WhitelistIP info of S3 Bucket Policy.:param ip: IP to get info.:return: Dict, {"Bucket": ..., }."""client = self.get_service_client("s3")response = client.list_buckets()for detail in response["Buckets"]:try:bucket_policy = client.get_bucket_policy(Bucket=detail["Name"])["Policy"]except botocore.exceptions.ClientError:passelse:if re.search(self.get_ip_pattern(ip), str(bucket_policy)):whitelistip_info = {"Bucket": detail["Name"], "Policy": bucket_policy}logger.info("get_s3_whitelistip_info:\n\t{whitelistip_info}".format(whitelistip_info=whitelistip_info))yield whitelistip_info@log.log(logger=logger, if_exit=True)def add_ec2_whitelistip_info(self, ip, new_ip, ):"""Add WhitelistIP info into AWS Security Group. See also:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.authorize_security_group_ingress.:param ip: IP to get info.:param new_ip: IP to add info.:return: Python built-in exit code."""client = self.get_service_client("ec2", )for whitelistip_info in self.get_ec2_whitelistip_info(ip):whitelistip_info["IpPermissions"] = eval(re.sub(self.get_ip_pattern(ip), new_ip, str(whitelistip_info["IpPermissions"])))try:client.authorize_security_group_ingress(**whitelistip_info)except botocore.exceptions.ClientError:continueelse:logger.warning("add_ec2_whitelistip_info:\n\t{info}".format(info=str(whitelistip_info)))@log.log(logger=logger, if_exit=True)def remove_ec2_whitelistip_info(self, ip, ):"""Remove WhitelistIP info from AWS Security Group. See also:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.revoke_security_group_ingress.:param ip: IP to get info and remove.:return: Python built-in exit code."""client = self.get_service_client("ec2", )for whitelistip_info in self.get_ec2_whitelistip_info(ip):logger.warning("remove_ec2_whitelistip_info:\n\t{info}".format(info=str(whitelistip_info)))client.revoke_security_group_ingress(**whitelistip_info)@log.log(logger=logger, if_exit=True)def modify_waf_whitelistip_info(self, ip, modify_mode, new_ip=None, ):"""Modify WhitelistIP info of AWS IP addresses. See also:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/waf.html#WAF.Client.update_ip_set.:param ip: IP to modify(remove).:param new_ip: IP to add info.:param modify_mode: modify mode(valid values: INSERT, DELETE).:return: Python built-in exit code."""assert modify_mode == "INSERT" or modify_mode == "DELETE"client = self.get_service_client("waf", )for whitelistip_info in self.get_waf_whitelistip_info(ip):if modify_mode == "INSERT" and re.search(self.get_ip_pattern(new_ip),str(whitelistip_info["duplicate_check"])):continuedel whitelistip_info["duplicate_check"]if modify_mode == "INSERT":whitelistip_info["IPSetDescriptor"]["Value"] = re.sub(self.get_ip_pattern(ip), new_ip,whitelistip_info["IPSetDescriptor"]["Value"])whitelistip_info["Updates"] = [{"Action": modify_mode, "IPSetDescriptor": whitelistip_info["IPSetDescriptor"], }]whitelistip_info["ChangeToken"] = client.get_change_token()["ChangeToken"]del whitelistip_info["IPSetDescriptor"]logger.warning("modify_waf_whitelistip_info:\n\t{info}".format(info=str(whitelistip_info)))client.update_ip_set(**whitelistip_info)@log.log(logger=logger, if_exit=True)def modify_s3_whitelistip_info(self, ip, modify_mode, new_ip=None, ):"""Modify WhitelistIP info of S3 Bucket Policy. See also:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_bucket_policy.:param ip: IP to modify(remove).:param new_ip: IP to add info.:param modify_mode: modify mode(valid values: INSERT, DELETE).:return: Python built-in exit code."""assert modify_mode == "INSERT" or modify_mode == "DELETE"client = self.get_service_client("s3", )for whitelistip_info in self.get_s3_whitelistip_info(ip):if modify_mode == "INSERT" and re.search(self.get_ip_pattern(new_ip), str(whitelistip_info)):continuepolicy_info = eval(whitelistip_info["Policy"])for policy_statement in policy_info["Statement"]:for ip_set in policy_statement["Condition"]["IpAddress"]["aws:SourceIp"]:if re.search(self.get_ip_pattern(ip), ip_set):if modify_mode == "INSERT":policy_statement["Condition"]["IpAddress"]["aws:SourceIp"].append(re.sub(self.get_ip_pattern(ip), new_ip, ip_set))else:policy_statement["Condition"]["IpAddress"]["aws:SourceIp"].remove(ip_set)whitelistip_info["Policy"] = json.dumps(policy_info)logger.warning("modify_s3_whitelistip_info:\n\t{info}".format(info=str(whitelistip_info)))whitelistip_info["ConfirmRemoveSelfBucketAccess"] = Falseclient.put_bucket_policy(**whitelistip_info)

log.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-09-04
program       : *_* log logging module *_*
"""import os
import sys
import logging
import functoolsclass LOG(object):"""Log logging definition."""def __init__(self, level=logging.INFO, stream=sys.stdout, filemode='a',filename=os.path.dirname(os.path.abspath(__file__)) + '/' + "log",datefmt="%Y-%m-%d %H:%M:%S",format="%(asctime)s\t%(levelname)s\t< Module: %(module)s, Function: %(funcName)s >\t%(message)s",**kwargs):"""LOG init.:param level: arg pass to standard library logging.basicConfig().:param stream: arg pass to standard library logging.basicConfig().:param filemode: arg pass to standard library logging.basicConfig().:param filename: arg pass to standard library logging.basicConfig().:param datefmt: arg pass to standard library logging.basicConfig().:param format: arg pass to standard library logging.basicConfig().:param kwargs: arg pass to standard library logging.basicConfig()."""self.level = levelself.stream = streamself.filename = filenameself.filemode = filemodeself.datefmt = datefmtself.format = formatself.kwargs = kwargsdef logger(self, name=__name__):"""Logger object generates.:param name: Logger name(parameters pass to standard library logging.getLogger()).:return: Logger object."""args = {"level": self.level,"stream": self.stream,"filename": self.filename,"filemode": self.filemode,"datefmt": self.datefmt,"format": self.format,}try:if self.stream and self.filename:del args["stream"]except KeyError:passlogging.basicConfig(**args, **self.kwargs)return logging.getLogger(name)def raise_log(raised_except, msg=''):"""Raise exception by hand to escape error exit caused by built-in [raise].:param raised_except: Exception object.:param msg: Exception feedback message.:return: Python's built-in exit code.Output Exception of [raised_except]."""try:raise raised_except(msg)except raised_except as E:logging.exception(E)def log(logger=None, exc_msg='', if_exit=False, exit_msg='', result_check=False, check_except=None, check_msg=''):"""Log logging decorator function.:param logger: Logger object(see also logging.getLogger()).:param exc_msg: extra message to return when exceptions catch.:param if_exit: Boolean.Whether to exit or not when catching exception.:param exit_msg: extra message to return when exceptions catch and exit.:param result_check: Boolean.Whether or not to check Python's False status result of [func]'s return.:param check_except: Exception object to raise when [result_check].:param check_msg: Exception feedback message to return when [result_check].:return: decorated function [func]'s return."""if not logger:logger = LOG().logger()def decorator(func):@functools.wraps(func)def wrapper(*args, **kwargs):result = Nonetry:result = func(*args, **kwargs)except BaseException:logger.exception(exc_msg)if if_exit:sys.exit(exit_msg)finally:if result_check:if not result:raise_log(check_except, check_msg)return resultreturn wrapperreturn decorator

new_ip_append.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-09-04
program       : *_* new WhitelistIP appending call file *_*
"""import functions"""
*******************************************************************************
Unit test button|\_/|| ・x・ |\_____/    ||         |    私はBilery Zooです...\       ノ ((( (/ ̄ ̄ ̄ ̄(/ヽ)
"""
UT_FLAG: bool = True
"""
Default `True` for script testing
Set to `False` when production using
*******************************************************************************
""""""
*******************************************************************************
Region distinguish button
"""
REGION_DISTINGUISH_FLAG: bool = True
"""
Default `True` for region distinguishing
Set to `False` when non-distinguishing using(Japan/Tokyo region only)
*******************************************************************************
"""
region_list: list = ["ap-northeast-1", "us-west-1", "us-east-1", "us-east-2", "us-west-2", "ap-south-1","ap-northeast-2", "ap-southeast-1", "ap-southeast-2", "ca-central-1", "eu-central-1", "eu-west-1","eu-west-2", "eu-west-3", "eu-north-1", "sa-east-1",] if REGION_DISTINGUISH_FLAG else ["ap-northeast-1", ]"""
*******************************************************************************
WhitelistIP setting area
"""
new_whitelistip = "127.0.0.1"
old_whitelistip = "127.0.0.0" if UT_FLAG else "0.0.0.0"
"""
Setting `new_whitelistip`(to append) and `old_whitelistip`(to remove)
Handlers below catch these two Args to do the switching jobs.
*******************************************************************************
"""for region in region_list:AWS_Switch = functions.AWS(region)AWS_Switch.add_ec2_whitelistip_info(old_whitelistip, new_whitelistip)AWS_Switch.modify_waf_whitelistip_info(old_whitelistip, "INSERT", new_ip=new_whitelistip)AWS_Switch.modify_s3_whitelistip_info(old_whitelistip, "INSERT", new_ip=new_whitelistip)

old_ip_delete.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
create_author : 蛙鳜鸡鹳狸猿
create_time   : 2019-09-04
program       : *_* odl WhitelistIP deleting call file *_*
"""import functions"""
*******************************************************************************
Unit test button|\_/|| ・x・ |\_____/    ||         |    私はBilery Zooです...\       ノ ((( (/ ̄ ̄ ̄ ̄(/ヽ)
"""
UT_FLAG: bool = True
"""
Default `True` for script testing
Set to `False` when production using
*******************************************************************************
""""""
*******************************************************************************
Region distinguish button
"""
REGION_DISTINGUISH_FLAG: bool = True
"""
Default `True` for region distinguishing
Set to `False` when non-distinguishing using(Japan/Tokyo region only)
*******************************************************************************
"""
region_list: list = ["ap-northeast-1", "us-west-1", "us-east-1", "us-east-2", "us-west-2", "ap-south-1","ap-northeast-2", "ap-southeast-1", "ap-southeast-2", "ca-central-1", "eu-central-1", "eu-west-1","eu-west-2", "eu-west-3", "eu-north-1", "sa-east-1",] if REGION_DISTINGUISH_FLAG else ["ap-northeast-1", ]"""
*******************************************************************************
WhitelistIP setting area
"""
old_whitelistip = "127.0.0.0" if UT_FLAG else "0.0.0.0"
"""
Setting `old_whitelistip`(to remove)
Handlers below catch this Arg to do the switching jobs.
*******************************************************************************
"""for region in region_list:AWS_Switch = functions.AWS(region)AWS_Switch.remove_ec2_whitelistip_info(old_whitelistip)AWS_Switch.modify_waf_whitelistip_info(old_whitelistip, "DELETE")AWS_Switch.modify_s3_whitelistip_info(old_whitelistip, "DELETE")

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部