| # |
| # Copyright (c) 2011 Thomas Graf <tgraf@suug.ch> |
| # |
| |
| """Module providing access to network addresses |
| """ |
| |
| from __future__ import absolute_import |
| |
| |
| __version__ = '1.0' |
| __all__ = [ |
| 'AddressCache', |
| 'Address'] |
| |
| import datetime |
| from .. import core as netlink |
| from . import capi as capi |
| from . import link as Link |
| from .. import util as util |
| |
| class AddressCache(netlink.Cache): |
| """Cache containing network addresses""" |
| |
| def __init__(self, cache=None): |
| if not cache: |
| cache = self._alloc_cache_name('route/addr') |
| |
| self._protocol = netlink.NETLINK_ROUTE |
| self._nl_cache = cache |
| |
| def __getitem__(self, key): |
| # Using ifindex=0 here implies that the local address itself |
| # is unique, otherwise the first occurence is returned. |
| return self.lookup(0, key) |
| |
| def lookup(self, ifindex, local): |
| if type(local) is str: |
| local = netlink.AbstractAddress(local) |
| |
| addr = capi.rtnl_addr_get(self._nl_cache, ifindex, |
| local._nl_addr) |
| if addr is None: |
| raise KeyError() |
| |
| return Address._from_capi(addr) |
| |
| @staticmethod |
| def _new_object(obj): |
| return Address(obj) |
| |
| @staticmethod |
| def _new_cache(cache): |
| return AddressCache(cache=cache) |
| |
| class Address(netlink.Object): |
| """Network address""" |
| |
| def __init__(self, obj=None): |
| netlink.Object.__init__(self, 'route/addr', 'address', obj) |
| self._rtnl_addr = self._obj2type(self._nl_object) |
| |
| @classmethod |
| def _from_capi(cls, obj): |
| return cls(capi.addr2obj(obj)) |
| |
| @staticmethod |
| def _obj2type(obj): |
| return capi.obj2addr(obj) |
| |
| def __cmp__(self, other): |
| # sort by: |
| # 1. network link |
| # 2. address family |
| # 3. local address (including prefixlen) |
| diff = self.ifindex - other.ifindex |
| |
| if diff == 0: |
| diff = self.family - other.family |
| if diff == 0: |
| diff = capi.nl_addr_cmp(self.local, other.local) |
| |
| return diff |
| |
| @staticmethod |
| def _new_instance(obj): |
| return Address(obj) |
| |
| @property |
| @netlink.nlattr(type=int, immutable=True, fmt=util.num) |
| def ifindex(self): |
| """interface index""" |
| return capi.rtnl_addr_get_ifindex(self._rtnl_addr) |
| |
| @ifindex.setter |
| def ifindex(self, value): |
| link = Link.resolve(value) |
| if not link: |
| raise ValueError() |
| |
| self.link = link |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.string) |
| def link(self): |
| link = capi.rtnl_addr_get_link(self._rtnl_addr) |
| if not link: |
| return None |
| |
| return Link.Link.from_capi(link) |
| |
| @link.setter |
| def link(self, value): |
| if type(value) is str: |
| try: |
| value = Link.resolve(value) |
| except KeyError: |
| raise ValueError() |
| |
| capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link) |
| |
| # ifindex is immutable but we assume that if _orig does not |
| # have an ifindex specified, it was meant to be given here |
| if capi.rtnl_addr_get_ifindex(self._orig) == 0: |
| capi.rtnl_addr_set_ifindex(self._orig, value.ifindex) |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.string) |
| def label(self): |
| """address label""" |
| return capi.rtnl_addr_get_label(self._rtnl_addr) |
| |
| @label.setter |
| def label(self, value): |
| capi.rtnl_addr_set_label(self._rtnl_addr, value) |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.string) |
| def flags(self): |
| """Flags |
| |
| Setting this property will *Not* reset flags to value you supply in |
| |
| Examples: |
| addr.flags = '+xxx' # add xxx flag |
| addr.flags = 'xxx' # exactly the same |
| addr.flags = '-xxx' # remove xxx flag |
| addr.flags = [ '+xxx', '-yyy' ] # list operation |
| """ |
| flags = capi.rtnl_addr_get_flags(self._rtnl_addr) |
| return capi.rtnl_addr_flags2str(flags, 256)[0].split(',') |
| |
| def _set_flag(self, flag): |
| if flag.startswith('-'): |
| i = capi.rtnl_addr_str2flags(flag[1:]) |
| capi.rtnl_addr_unset_flags(self._rtnl_addr, i) |
| elif flag.startswith('+'): |
| i = capi.rtnl_addr_str2flags(flag[1:]) |
| capi.rtnl_addr_set_flags(self._rtnl_addr, i) |
| else: |
| i = capi.rtnl_addr_str2flags(flag) |
| capi.rtnl_addr_set_flags(self._rtnl_addr, i) |
| |
| @flags.setter |
| def flags(self, value): |
| if type(value) is list: |
| for flag in value: |
| self._set_flag(flag) |
| else: |
| self._set_flag(value) |
| |
| @property |
| @netlink.nlattr(type=int, immutable=True, fmt=util.num) |
| def family(self): |
| """Address family""" |
| fam = capi.rtnl_addr_get_family(self._rtnl_addr) |
| return netlink.AddressFamily(fam) |
| |
| @family.setter |
| def family(self, value): |
| if not isinstance(value, netlink.AddressFamily): |
| value = netlink.AddressFamily(value) |
| |
| capi.rtnl_addr_set_family(self._rtnl_addr, int(value)) |
| |
| @property |
| @netlink.nlattr(type=int, fmt=util.num) |
| def scope(self): |
| """Address scope""" |
| scope = capi.rtnl_addr_get_scope(self._rtnl_addr) |
| return capi.rtnl_scope2str(scope, 32)[0] |
| |
| @scope.setter |
| def scope(self, value): |
| if type(value) is str: |
| value = capi.rtnl_str2scope(value) |
| capi.rtnl_addr_set_scope(self._rtnl_addr, value) |
| |
| @property |
| @netlink.nlattr(type=str, immutable=True, fmt=util.addr) |
| def local(self): |
| """Local address""" |
| a = capi.rtnl_addr_get_local(self._rtnl_addr) |
| return netlink.AbstractAddress(a) |
| |
| @local.setter |
| def local(self, value): |
| a = netlink.AbstractAddress(value) |
| capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr) |
| |
| # local is immutable but we assume that if _orig does not |
| # have a local address specified, it was meant to be given here |
| if capi.rtnl_addr_get_local(self._orig) is None: |
| capi.rtnl_addr_set_local(self._orig, a._nl_addr) |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.addr) |
| def peer(self): |
| """Peer address""" |
| a = capi.rtnl_addr_get_peer(self._rtnl_addr) |
| return netlink.AbstractAddress(a) |
| |
| @peer.setter |
| def peer(self, value): |
| a = netlink.AbstractAddress(value) |
| capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr) |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.addr) |
| def broadcast(self): |
| """Broadcast address""" |
| a = capi.rtnl_addr_get_broadcast(self._rtnl_addr) |
| return netlink.AbstractAddress(a) |
| |
| @broadcast.setter |
| def broadcast(self, value): |
| a = netlink.AbstractAddress(value) |
| capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr) |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.addr) |
| def multicast(self): |
| """multicast address""" |
| a = capi.rtnl_addr_get_multicast(self._rtnl_addr) |
| return netlink.AbstractAddress(a) |
| |
| @multicast.setter |
| def multicast(self, value): |
| try: |
| a = netlink.AbstractAddress(value) |
| except ValueError as err: |
| raise AttributeError('multicast', err) |
| |
| capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr) |
| |
| @property |
| @netlink.nlattr(type=str, fmt=util.addr) |
| def anycast(self): |
| """anycast address""" |
| a = capi.rtnl_addr_get_anycast(self._rtnl_addr) |
| return netlink.AbstractAddress(a) |
| |
| @anycast.setter |
| def anycast(self, value): |
| a = netlink.AbstractAddress(value) |
| capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr) |
| |
| @property |
| @netlink.nlattr(type=int, immutable=True, fmt=util.num) |
| def valid_lifetime(self): |
| """Valid lifetime""" |
| msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr) |
| if msecs == 0xFFFFFFFF: |
| return None |
| else: |
| return datetime.timedelta(seconds=msecs) |
| |
| @valid_lifetime.setter |
| def valid_lifetime(self, value): |
| capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value)) |
| |
| @property |
| @netlink.nlattr(type=int, immutable=True, fmt=util.num) |
| def preferred_lifetime(self): |
| """Preferred lifetime""" |
| msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr) |
| if msecs == 0xFFFFFFFF: |
| return None |
| else: |
| return datetime.timedelta(seconds=msecs) |
| |
| @preferred_lifetime.setter |
| def preferred_lifetime(self, value): |
| capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value)) |
| |
| @property |
| @netlink.nlattr(type=int, immutable=True, fmt=util.num) |
| def create_time(self): |
| """Creation time""" |
| hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr) |
| return datetime.timedelta(milliseconds=10*hsec) |
| |
| @property |
| @netlink.nlattr(type=int, immutable=True, fmt=util.num) |
| def last_update(self): |
| """Last update""" |
| hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr) |
| return datetime.timedelta(milliseconds=10*hsec) |
| |
| def add(self, socket=None, flags=None): |
| if not socket: |
| socket = netlink.lookup_socket(netlink.NETLINK_ROUTE) |
| |
| if not flags: |
| flags = netlink.NLM_F_CREATE |
| |
| ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags) |
| if ret < 0: |
| raise netlink.KernelError(ret) |
| |
| def delete(self, socket, flags=0): |
| """Attempt to delete this address in the kernel""" |
| ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags) |
| if ret < 0: |
| raise netlink.KernelError(ret) |
| |
| ################################################################### |
| # private properties |
| # |
| # Used for formatting output. USE AT OWN RISK |
| @property |
| def _flags(self): |
| return ','.join(self.flags) |
| |
| def format(self, details=False, stats=False, nodev=False, indent=''): |
| """Return address as formatted text""" |
| fmt = util.MyFormatter(self, indent) |
| |
| buf = fmt.format('{a|local!b}') |
| |
| if not nodev: |
| buf += fmt.format(' {a|ifindex}') |
| |
| buf += fmt.format(' {a|scope}') |
| |
| if self.label: |
| buf += fmt.format(' "{a|label}"') |
| |
| buf += fmt.format(' <{a|_flags}>') |
| |
| if details: |
| buf += fmt.nl('\t{t|broadcast} {t|multicast}') \ |
| + fmt.nl('\t{t|peer} {t|anycast}') |
| |
| if self.valid_lifetime: |
| buf += fmt.nl('\t{s|valid-lifetime!k} '\ |
| '{a|valid_lifetime}') |
| |
| if self.preferred_lifetime: |
| buf += fmt.nl('\t{s|preferred-lifetime!k} '\ |
| '{a|preferred_lifetime}') |
| |
| if stats and (self.create_time or self.last_update): |
| buf += self.nl('\t{s|created!k} {a|create_time}'\ |
| ' {s|last-updated!k} {a|last_update}') |
| |
| return buf |