diff env/lib/python3.7/site-packages/boto/ec2/securitygroup.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/boto/ec2/securitygroup.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,392 +0,0 @@
-# Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/
-# Copyright (c) 2011, Eucalyptus Systems, Inc.
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-"""
-Represents an EC2 Security Group
-"""
-from boto.ec2.ec2object import TaggedEC2Object
-from boto.exception import BotoClientError
-
-
-class SecurityGroup(TaggedEC2Object):
-
-    def __init__(self, connection=None, owner_id=None,
-                 name=None, description=None, id=None):
-        super(SecurityGroup, self).__init__(connection)
-        self.id = id
-        self.owner_id = owner_id
-        self.name = name
-        self.description = description
-        self.vpc_id = None
-        self.rules = IPPermissionsList()
-        self.rules_egress = IPPermissionsList()
-
-    def __repr__(self):
-        return 'SecurityGroup:%s' % self.name
-
-    def startElement(self, name, attrs, connection):
-        retval = super(SecurityGroup, self).startElement(name, attrs, connection)
-        if retval is not None:
-            return retval
-        if name == 'ipPermissions':
-            return self.rules
-        elif name == 'ipPermissionsEgress':
-            return self.rules_egress
-        else:
-            return None
-
-    def endElement(self, name, value, connection):
-        if name == 'ownerId':
-            self.owner_id = value
-        elif name == 'groupId':
-            self.id = value
-        elif name == 'groupName':
-            self.name = value
-        elif name == 'vpcId':
-            self.vpc_id = value
-        elif name == 'groupDescription':
-            self.description = value
-        elif name == 'ipRanges':
-            pass
-        elif name == 'return':
-            if value == 'false':
-                self.status = False
-            elif value == 'true':
-                self.status = True
-            else:
-                raise Exception(
-                    'Unexpected value of status %s for group %s' % (
-                        value,
-                        self.name
-                    )
-                )
-        else:
-            setattr(self, name, value)
-
-    def delete(self, dry_run=False):
-        if self.vpc_id:
-            return self.connection.delete_security_group(
-                group_id=self.id,
-                dry_run=dry_run
-            )
-        else:
-            return self.connection.delete_security_group(
-                self.name,
-                dry_run=dry_run
-            )
-
-    def add_rule(self, ip_protocol, from_port, to_port,
-                 src_group_name, src_group_owner_id, cidr_ip,
-                 src_group_group_id, dry_run=False):
-        """
-        Add a rule to the SecurityGroup object.  Note that this method
-        only changes the local version of the object.  No information
-        is sent to EC2.
-        """
-        rule = IPPermissions(self)
-        rule.ip_protocol = ip_protocol
-        rule.from_port = from_port
-        rule.to_port = to_port
-        self.rules.append(rule)
-        rule.add_grant(
-            src_group_name,
-            src_group_owner_id,
-            cidr_ip,
-            src_group_group_id,
-            dry_run=dry_run
-        )
-
-    def remove_rule(self, ip_protocol, from_port, to_port,
-                    src_group_name, src_group_owner_id, cidr_ip,
-                    src_group_group_id, dry_run=False):
-        """
-        Remove a rule to the SecurityGroup object.  Note that this method
-        only changes the local version of the object.  No information
-        is sent to EC2.
-        """
-        if not self.rules:
-            raise ValueError("The security group has no rules")
-
-        target_rule = None
-        for rule in self.rules:
-            if rule.ip_protocol == ip_protocol:
-                if rule.from_port == from_port:
-                    if rule.to_port == to_port:
-                        target_rule = rule
-                        target_grant = None
-                        for grant in rule.grants:
-                            if grant.name == src_group_name or grant.group_id == src_group_group_id:
-                                if grant.owner_id == src_group_owner_id:
-                                    if grant.cidr_ip == cidr_ip:
-                                        target_grant = grant
-                        if target_grant:
-                            rule.grants.remove(target_grant)
-            if len(rule.grants) == 0:
-                self.rules.remove(target_rule)
-
-    def authorize(self, ip_protocol=None, from_port=None, to_port=None,
-                  cidr_ip=None, src_group=None, dry_run=False):
-        """
-        Add a new rule to this security group.
-        You need to pass in either src_group_name
-        OR ip_protocol, from_port, to_port,
-        and cidr_ip.  In other words, either you are authorizing another
-        group or you are authorizing some ip-based rule.
-
-        :type ip_protocol: string
-        :param ip_protocol: Either tcp | udp | icmp
-
-        :type from_port: int
-        :param from_port: The beginning port number you are enabling
-
-        :type to_port: int
-        :param to_port: The ending port number you are enabling
-
-        :type cidr_ip: string or list of strings
-        :param cidr_ip: The CIDR block you are providing access to.
-                        See http://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing
-
-        :type src_group: :class:`boto.ec2.securitygroup.SecurityGroup` or
-                         :class:`boto.ec2.securitygroup.GroupOrCIDR`
-        :param src_group: The Security Group you are granting access to.
-
-        :rtype: bool
-        :return: True if successful.
-        """
-        group_name = None
-        if not self.vpc_id:
-            group_name = self.name
-        group_id = None
-        if self.vpc_id:
-            group_id = self.id
-        src_group_name = None
-        src_group_owner_id = None
-        src_group_group_id = None
-        if src_group:
-            cidr_ip = None
-            src_group_owner_id = src_group.owner_id
-            if not self.vpc_id:
-                src_group_name = src_group.name
-            else:
-                if hasattr(src_group, 'group_id'):
-                    src_group_group_id = src_group.group_id
-                else:
-                    src_group_group_id = src_group.id
-        status = self.connection.authorize_security_group(group_name,
-                                                          src_group_name,
-                                                          src_group_owner_id,
-                                                          ip_protocol,
-                                                          from_port,
-                                                          to_port,
-                                                          cidr_ip,
-                                                          group_id,
-                                                          src_group_group_id,
-                                                          dry_run=dry_run)
-        if status:
-            if not isinstance(cidr_ip, list):
-                cidr_ip = [cidr_ip]
-            for single_cidr_ip in cidr_ip:
-                self.add_rule(ip_protocol, from_port, to_port, src_group_name,
-                              src_group_owner_id, single_cidr_ip,
-                              src_group_group_id, dry_run=dry_run)
-        return status
-
-    def revoke(self, ip_protocol=None, from_port=None, to_port=None,
-               cidr_ip=None, src_group=None, dry_run=False):
-        group_name = None
-        if not self.vpc_id:
-            group_name = self.name
-        group_id = None
-        if self.vpc_id:
-            group_id = self.id
-        src_group_name = None
-        src_group_owner_id = None
-        src_group_group_id = None
-        if src_group:
-            cidr_ip = None
-            src_group_owner_id = src_group.owner_id
-            if not self.vpc_id:
-                src_group_name = src_group.name
-            else:
-                if hasattr(src_group, 'group_id'):
-                    src_group_group_id = src_group.group_id
-                else:
-                    src_group_group_id = src_group.id
-        status = self.connection.revoke_security_group(group_name,
-                                                       src_group_name,
-                                                       src_group_owner_id,
-                                                       ip_protocol,
-                                                       from_port,
-                                                       to_port,
-                                                       cidr_ip,
-                                                       group_id,
-                                                       src_group_group_id,
-                                                       dry_run=dry_run)
-        if status:
-            self.remove_rule(ip_protocol, from_port, to_port, src_group_name,
-                             src_group_owner_id, cidr_ip, src_group_group_id,
-                             dry_run=dry_run)
-        return status
-
-    def copy_to_region(self, region, name=None, dry_run=False):
-        """
-        Create a copy of this security group in another region.
-        Note that the new security group will be a separate entity
-        and will not stay in sync automatically after the copy
-        operation.
-
-        :type region: :class:`boto.ec2.regioninfo.RegionInfo`
-        :param region: The region to which this security group will be copied.
-
-        :type name: string
-        :param name: The name of the copy.  If not supplied, the copy
-                     will have the same name as this security group.
-
-        :rtype: :class:`boto.ec2.securitygroup.SecurityGroup`
-        :return: The new security group.
-        """
-        if region.name == self.region:
-            raise BotoClientError('Unable to copy to the same Region')
-        conn_params = self.connection.get_params()
-        rconn = region.connect(**conn_params)
-        sg = rconn.create_security_group(
-            name or self.name,
-            self.description,
-            dry_run=dry_run
-        )
-        source_groups = []
-        for rule in self.rules:
-            for grant in rule.grants:
-                grant_nom = grant.name or grant.group_id
-                if grant_nom:
-                    if grant_nom not in source_groups:
-                        source_groups.append(grant_nom)
-                        sg.authorize(None, None, None, None, grant,
-                                     dry_run=dry_run)
-                else:
-                    sg.authorize(rule.ip_protocol, rule.from_port, rule.to_port,
-                                 grant.cidr_ip, dry_run=dry_run)
-        return sg
-
-    def instances(self, dry_run=False):
-        """
-        Find all of the current instances that are running within this
-        security group.
-
-        :rtype: list of :class:`boto.ec2.instance.Instance`
-        :return: A list of Instance objects
-        """
-        rs = []
-        if self.vpc_id:
-            rs.extend(self.connection.get_all_reservations(
-                filters={'instance.group-id': self.id},
-                dry_run=dry_run
-            ))
-        else:
-            rs.extend(self.connection.get_all_reservations(
-                filters={'group-id': self.id},
-                dry_run=dry_run
-            ))
-        instances = [i for r in rs for i in r.instances]
-        return instances
-
-
-class IPPermissionsList(list):
-
-    def startElement(self, name, attrs, connection):
-        if name == 'item':
-            self.append(IPPermissions(self))
-            return self[-1]
-        return None
-
-    def endElement(self, name, value, connection):
-        pass
-
-
-class IPPermissions(object):
-
-    def __init__(self, parent=None):
-        self.parent = parent
-        self.ip_protocol = None
-        self.from_port = None
-        self.to_port = None
-        self.grants = []
-
-    def __repr__(self):
-        return 'IPPermissions:%s(%s-%s)' % (self.ip_protocol,
-                                            self.from_port, self.to_port)
-
-    def startElement(self, name, attrs, connection):
-        if name == 'item':
-            self.grants.append(GroupOrCIDR(self))
-            return self.grants[-1]
-        return None
-
-    def endElement(self, name, value, connection):
-        if name == 'ipProtocol':
-            self.ip_protocol = value
-        elif name == 'fromPort':
-            self.from_port = value
-        elif name == 'toPort':
-            self.to_port = value
-        else:
-            setattr(self, name, value)
-
-    def add_grant(self, name=None, owner_id=None, cidr_ip=None, group_id=None,
-                  dry_run=False):
-        grant = GroupOrCIDR(self)
-        grant.owner_id = owner_id
-        grant.group_id = group_id
-        grant.name = name
-        grant.cidr_ip = cidr_ip
-        self.grants.append(grant)
-        return grant
-
-
-class GroupOrCIDR(object):
-
-    def __init__(self, parent=None):
-        self.owner_id = None
-        self.group_id = None
-        self.name = None
-        self.cidr_ip = None
-
-    def __repr__(self):
-        if self.cidr_ip:
-            return '%s' % self.cidr_ip
-        else:
-            return '%s-%s' % (self.name or self.group_id, self.owner_id)
-
-    def startElement(self, name, attrs, connection):
-        return None
-
-    def endElement(self, name, value, connection):
-        if name == 'userId':
-            self.owner_id = value
-        elif name == 'groupId':
-            self.group_id = value
-        elif name == 'groupName':
-            self.name = value
-        if name == 'cidrIp':
-            self.cidr_ip = value
-        else:
-            setattr(self, name, value)