import ipaddress
import socket
import struct

import pytest
from atf_python.sys.net.vnet import SingleVnetTestTemplate
from atf_python.sys.netlink.attrs import NlAttr
from atf_python.sys.netlink.attrs import NlAttrIp
from atf_python.sys.netlink.attrs import NlAttrNested
from atf_python.sys.netlink.attrs import NlAttrU32
from atf_python.sys.netlink.base_headers import NlmBaseFlags
from atf_python.sys.netlink.base_headers import NlmNewFlags
from atf_python.sys.netlink.base_headers import Nlmsghdr
from atf_python.sys.netlink.message import NlMsgType
from atf_python.sys.netlink.netlink import NetlinkTestTemplate
from atf_python.sys.netlink.netlink import Nlsock
from atf_python.sys.netlink.netlink_generic import CarpAttrType
from atf_python.sys.netlink.netlink_generic import CarpGenMessage
from atf_python.sys.netlink.netlink_generic import CarpMsgType
from atf_python.sys.netlink.netlink_route import IfaAttrType
from atf_python.sys.netlink.netlink_route import IfaCacheInfo
from atf_python.sys.netlink.netlink_route import IfafAttrType
from atf_python.sys.netlink.netlink_route import IfafFlags6
from atf_python.sys.netlink.netlink_route import IfaFlags
from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
from atf_python.sys.netlink.netlink_route import NlRtMsgType
from atf_python.sys.netlink.netlink_route import RtScope
from atf_python.sys.netlink.utils import enum_or_int
from atf_python.sys.netlink.utils import NlConst


class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):
    def setup_method(self, method):
        method_name = method.__name__
        if "4" in method_name:
            if "nofilter" in method_name:
                self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"]
            else:
                self.IPV4_PREFIXES = ["192.0.2.1/24"]
        if "6" in method_name:
            self.IPV6_PREFIXES = ["2001:db8::1/64"]
        super().setup_method(method)
        self.setup_netlink(NlConst.NETLINK_ROUTE)

    def test_46_nofilter(self):
        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.set_request()
        self.write_message(msg)

        ret = []
        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            family = rx_msg.base_hdr.ifa_family
            scope = rx_msg.base_hdr.ifa_scope
            ret.append((ifname, family, scope))

        ifname = "lo0"
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
        assert len([r for r in ret if r[0] == ifname]) == 3

        ifname = self.vnet.iface_alias_map["if1"].name
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
        assert len([r for r in ret if r[0] == ifname]) == 4

    def test_46_filter_iface(self):
        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
        epair_ifname = self.vnet.iface_alias_map["if1"].name

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.set_request()
        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
        self.write_message(msg)

        ret = []
        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            family = rx_msg.base_hdr.ifa_family
            ret.append((ifname, family, rx_msg))

        ifname = epair_ifname
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
        assert len(ret) == 3

    def test_46_filter_family_compat(self):
        """Tests that family filtering works with the stripped header"""

        hdr = Nlmsghdr(
            nlmsg_len=17,
            nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
            nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
            nlmsg_seq=self.helper.get_seq(),
        )
        data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
        self.nlsock.write_data(data)

        ret = []
        for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            family = rx_msg.base_hdr.ifa_family
            ret.append((ifname, family, rx_msg))
        assert len(ret) == 2

    def filter_iface_family(self, family, num_items):
        """Tests that listing outputs IPv4 for the specific interface"""
        epair_ifname = self.vnet.iface_alias_map["if1"].name

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.set_request()
        msg.base_hdr.ifa_family = family
        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
        self.write_message(msg)

        ret = []
        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            assert family == rx_msg.base_hdr.ifa_family
            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            ret.append(rx_msg)
        assert len(ret) == num_items
        return ret

    def test_4_broadcast(self):
        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
        ret = self.filter_iface_family(socket.AF_INET, 1)
        # Should be 192.0.2.1/24
        msg = ret[0]
        # Family and ifindex has been checked already
        assert msg.base_hdr.ifa_prefixlen == 24
        # Ignore IFA_FLAGS for now
        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"
        assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"
        assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"

        epair_ifname = self.vnet.iface_alias_map["if1"].name
        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname

    def test_6_broadcast(self):
        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
        ret = self.filter_iface_family(socket.AF_INET6, 2)
        # Should be 192.0.2.1/24
        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
            (gmsg, lmsg) = ret
        else:
            (lmsg, gmsg) = ret
        # Start with global ( 2001:db8::1/64 )
        msg = gmsg
        # Family and ifindex has been checked already
        assert msg.base_hdr.ifa_prefixlen == 64
        # Ignore IFA_FLAGS for now
        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"
        assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
        assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None

        epair_ifname = self.vnet.iface_alias_map["if1"].name
        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname

        # Local: fe80::/64
        msg = lmsg
        assert msg.base_hdr.ifa_prefixlen == 64
        # Ignore IFA_FLAGS for now
        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value

        addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)
        assert addr.is_link_local
        # Verify that ifindex is not emmbedded
        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
        assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
        assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None

        epair_ifname = self.vnet.iface_alias_map["if1"].name
        assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname


class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):
    def setup_method(self, method):
        super().setup_method(method)
        self.setup_netlink(NlConst.NETLINK_ROUTE)

    def send_check_success(self, msg):
        rx_msg = self.get_reply(msg)
        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
        assert rx_msg.error_code == 0

    @staticmethod
    def get_family_from_ip(ip):
        if ip.version == 4:
            return socket.AF_INET
        return socket.AF_INET6

    def create_msg(self, ifa):
        iface = self.vnet.iface_alias_map["if1"]

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
        msg.set_request()
        msg.nl_hdr.nlmsg_flags |= (
            NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
        )
        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
        msg.base_hdr.ifa_index = iface.ifindex
        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
        return msg

    def get_ifa_list(self, ifindex=0, family=0):
        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.set_request()
        msg.base_hdr.ifa_family = family
        msg.base_hdr.ifa_index = ifindex
        self.write_message(msg)
        return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)

    def find_msg_by_ifa(self, msg_list, ip):
        for msg in msg_list:
            if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):
                return msg
        return None

    def setup_dummy_carp(self, ifindex: int, vhid: int):
        self.require_module("carp")

        nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
        family_id = nlsock.get_genl_family_id("carp")

        msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)
        msg.set_request()
        msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))
        msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))
        rx_msg = nlsock.get_reply(msg)

        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
        assert rx_msg.error_code == 0


class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):
    def test_add_4(self):
        """Tests IPv4 address addition to the standard broadcast interface"""
        ifa = ipaddress.ip_interface("192.0.2.1/24")
        ifa_brd = ifa.network.broadcast_address
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 1
        rx_msg = lst[0]

        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)

    @pytest.mark.parametrize(
        "brd",
        [
            pytest.param((32, True, "192.0.2.1"), id="auto_32"),
            pytest.param((31, True, "255.255.255.255"), id="auto_31"),
            pytest.param((30, True, "192.0.2.3"), id="auto_30"),
            pytest.param((30, False, "192.0.2.2"), id="custom_30"),
            pytest.param((24, False, "192.0.2.7"), id="custom_24"),
        ],
    )
    def test_add_4_brd(self, brd):
        """Tests proper broadcast setup when adding IPv4 ifa"""
        plen, auto_brd, ifa_brd_str = brd
        ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))
        iface = self.vnet.iface_alias_map["if1"]
        ifa_brd = ipaddress.ip_address(ifa_brd_str)

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        if not auto_brd:
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 1
        rx_msg = lst[0]

        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)

    def test_add_6(self):
        ifa = ipaddress.ip_interface("2001:db8::1/64")
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 2
        rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg_gu is not None

        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)

    def test_add_4_carp(self):
        ifa = ipaddress.ip_interface("192.0.2.1/24")
        ifa_brd = ifa.network.broadcast_address
        iface = self.vnet.iface_alias_map["if1"]
        vhid = 77

        self.setup_dummy_carp(iface.ifindex, vhid)

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 1
        rx_msg = lst[0]

        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
        ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
        assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid

    def test_add_6_carp(self):
        ifa = ipaddress.ip_interface("2001:db8::1/64")
        iface = self.vnet.iface_alias_map["if1"]
        vhid = 77

        self.setup_dummy_carp(iface.ifindex, vhid)

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 2
        rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg_gu is not None

        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
        ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)
        assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid

    def test_add_6_lifetime(self):
        ifa = ipaddress.ip_interface("2001:db8::1/64")
        iface = self.vnet.iface_alias_map["if1"]
        pref_time = 43200
        valid_time = 86400

        ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 2
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is not None

        ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci
        assert pref_time - 5 <= ci.ifa_prefered <= pref_time
        assert valid_time - 5 <= ci.ifa_valid <= valid_time
        assert ci.cstamp > 0
        assert ci.tstamp > 0
        assert ci.tstamp >= ci.cstamp

    @pytest.mark.parametrize(
        "flags_str",
        [
            "autoconf",
            "deprecated",
            "autoconf,deprecated",
            "prefer_source",
        ],
    )
    def test_add_6_flags(self, flags_str):
        ifa = ipaddress.ip_interface("2001:db8::1/64")
        iface = self.vnet.iface_alias_map["if1"]

        flags_map = {
            "autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},
            "deprecated": {
                "nl": IfaFlags.IFA_F_DEPRECATED,
                "f": IfafFlags6.IN6_IFF_DEPRECATED,
            },
            "prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},
        }
        nl_flags = 0
        f_flags = 0

        for flag_str in flags_str.split(","):
            d = flags_map.get(flag_str, {})
            nl_flags |= enum_or_int(d.get("nl", 0))
            f_flags |= enum_or_int(d.get("f", 0))

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))
        attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]
        msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 2
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is not None

        assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags
        ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
        assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags

    def test_add_4_empty_message(self):
        """Tests correct failure w/ empty message"""
        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
        msg.set_request()
        msg.nl_hdr.nlmsg_flags |= (
            NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
        )

        rx_msg = self.get_reply(msg)
        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
        assert rx_msg.error_code != 0

    def test_add_4_empty_ifindex(self):
        """Tests correct failure w/ empty ifindex"""
        ifa = ipaddress.ip_interface("192.0.2.1/24")
        ifa_brd = ifa.network.broadcast_address

        msg = self.create_msg(ifa)
        msg.base_hdr.ifa_index = 0
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))

        rx_msg = self.get_reply(msg)
        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
        assert rx_msg.error_code != 0

    def test_add_4_empty_addr(self):
        """Tests correct failure w/ empty address"""
        ifa = ipaddress.ip_interface("192.0.2.1/24")
        ifa_brd = ifa.network.broadcast_address

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))

        rx_msg = self.get_reply(msg)
        assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
        assert rx_msg.error_code != 0

    @pytest.mark.parametrize(
        "ifa_str",
        [
            pytest.param("192.0.2.1/32", id="ipv4_host"),
            pytest.param("192.0.2.1/24", id="ipv4_prefix"),
            pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
            pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
        ],
    )
    @pytest.mark.parametrize(
        "tlv",
        [
            pytest.param("local", id="ifa_local"),
            pytest.param("address", id="ifa_address"),
        ],
    )
    def test_del(self, tlv, ifa_str):
        """Tests address deletion from the standard broadcast interface"""
        ifa = ipaddress.ip_interface(ifa_str)
        ifa_brd = ifa.network.broadcast_address
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))

        self.send_check_success(msg)
        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is not None

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
        msg.set_request()
        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
        msg.base_hdr.ifa_index = iface.ifindex
        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen

        if tlv == "local":
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        if tlv == "address":
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))

        self.send_check_success(msg)
        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is None


class TestRtNlIfaddrOpsP2p(RtnlIfaOps):
    IFTYPE = "gif"

    @pytest.mark.parametrize(
        "ifa_pair",
        [
            pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),
            pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),
            pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),
            pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),
            pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),
        ],
    )
    def test_add_4(self, ifa_pair):
        """Tests IPv4 address addition to the p2p interface"""
        ifa = ipaddress.ip_interface(ifa_pair[0])
        peer_ip = ipaddress.ip_address(ifa_pair[1])
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 1
        rx_msg = lst[0]

        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)

    @pytest.mark.parametrize(
        "ifa_pair",
        [
            pytest.param(
                ["2001:db8::1/64", "2001:db8::2"],
                id="dst_inside_64",
                marks=pytest.mark.xfail(reason="currently fails"),
            ),
            pytest.param(
                ["2001:db8::1/127", "2001:db8::2"],
                id="dst_inside_127",
                marks=pytest.mark.xfail(reason="currently fails"),
            ),
            pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),
            pytest.param(
                ["2001:db8::1/64", "2001:db8:2::2"],
                id="dst_outside_64",
                marks=pytest.mark.xfail(reason="currently fails"),
            ),
        ],
    )
    def test_add_6(self, ifa_pair):
        """Tests IPv6 address addition to the p2p interface"""
        ifa = ipaddress.ip_interface(ifa_pair[0])
        peer_ip = ipaddress.ip_address(ifa_pair[1])
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 2
        rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)
        assert rx_msg_gu is not None

        assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
        assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
        assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)

    @pytest.mark.parametrize(
        "ifa_pair",
        [
            pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),
            pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),
            pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),
        ],
    )
    @pytest.mark.parametrize(
        "tlv_pair",
        [
            pytest.param(["a", ""], id="ifa_addr=addr"),
            pytest.param(["", "a"], id="ifa_local=addr"),
            pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),
        ],
    )
    def test_del(self, tlv_pair, ifa_pair):
        """Tests address deletion from the P2P interface"""
        ifa = ipaddress.ip_interface(ifa_pair[0])
        peer_ip = ipaddress.ip_address(ifa_pair[1])
        iface = self.vnet.iface_alias_map["if1"]
        ifa_addr_str, ifa_local_str = tlv_pair

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))

        self.send_check_success(msg)
        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        rx_msg = self.find_msg_by_ifa(lst, peer_ip)
        assert rx_msg is not None

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
        msg.set_request()
        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
        msg.base_hdr.ifa_index = iface.ifindex
        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen

        if "a" in ifa_addr_str:
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
        if "p" in ifa_addr_str:
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
        if "a" in ifa_local_str:
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        if "p" in ifa_local_str:
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))

        self.send_check_success(msg)
        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is None


class TestRtNlAddIfaddrLo(RtnlIfaOps):
    IFTYPE = "lo"

    @pytest.mark.parametrize(
        "ifa_str",
        [
            pytest.param("192.0.2.1/24", id="prefix"),
            pytest.param("192.0.2.1/32", id="host"),
        ],
    )
    def test_add_4(self, ifa_str):
        """Tests IPv4 address addition to the loopback interface"""
        ifa = ipaddress.ip_interface(ifa_str)
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 1
        rx_msg = lst[0]

        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
        assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)

    @pytest.mark.parametrize(
        "ifa_str",
        [
            pytest.param("2001:db8::1/64", id="gu_prefix"),
            pytest.param("2001:db8::1/128", id="gu_host"),
        ],
    )
    def test_add_6(self, ifa_str):
        """Tests IPv6 address addition to the loopback interface"""
        ifa = ipaddress.ip_interface(ifa_str)
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))

        self.send_check_success(msg)

        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        assert len(lst) == 2  # link-local should be auto-created as well
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is not None

        assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
        assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
        assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)

    @pytest.mark.parametrize(
        "ifa_str",
        [
            pytest.param("192.0.2.1/32", id="ipv4_host"),
            pytest.param("192.0.2.1/24", id="ipv4_prefix"),
            pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
            pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
        ],
    )
    @pytest.mark.parametrize(
        "tlv",
        [
            pytest.param("local", id="ifa_local"),
            pytest.param("address", id="ifa_address"),
        ],
    )
    def test_del(self, tlv, ifa_str):
        """Tests address deletion from the loopback interface"""
        ifa = ipaddress.ip_interface(ifa_str)
        iface = self.vnet.iface_alias_map["if1"]

        msg = self.create_msg(ifa)
        msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))

        self.send_check_success(msg)
        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is not None

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
        msg.set_request()
        msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
        msg.base_hdr.ifa_index = iface.ifindex
        msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen

        if tlv == "local":
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
        if tlv == "address":
            msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))

        self.send_check_success(msg)
        lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
        rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
        assert rx_msg is None
