Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the rule handle after flush #88

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (cc *Conn) AddChain(c *Chain) *Chain {
{Type: unix.NFTA_CHAIN_TYPE, Data: []byte(c.Type + "\x00")},
})...)
}
cc.messages = append(cc.messages, netlink.Message{
cc.putMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWCHAIN),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
Expand All @@ -144,7 +144,7 @@ func (cc *Conn) DelChain(c *Chain) {
{Type: unix.NFTA_CHAIN_NAME, Data: []byte(c.Name + "\x00")},
})

cc.messages = append(cc.messages, netlink.Message{
cc.putMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELCHAIN),
Flags: netlink.Request | netlink.Acknowledge,
Expand All @@ -162,7 +162,7 @@ func (cc *Conn) FlushChain(c *Chain) {
{Type: unix.NFTA_RULE_TABLE, Data: []byte(c.Table.Name + "\x00")},
{Type: unix.NFTA_RULE_CHAIN, Data: []byte(c.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.putMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: netlink.Request | netlink.Acknowledge,
Expand Down
127 changes: 104 additions & 23 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,32 @@ import (
"golang.org/x/sys/unix"
)

type Entity interface {
HandleResponse(netlink.Message)
}

// A Conn represents a netlink connection of the nftables family.
//
// All methods return their input, so that variables can be defined from string
// literals when desired.
//
// Commands are buffered. Flush sends all buffered commands in a single batch.
type Conn struct {
TestDial nltest.Func // for testing only; passed to nltest.Dial
NetNS int // Network namespace netlink will interact with.
sync.Mutex
messages []netlink.Message
err error
TestDial nltest.Func // for testing only; passed to nltest.Dial
NetNS int // Network namespace netlink will interact with.
entities map[int]Entity
messagesMu sync.Mutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move messagesMu and messages down and separate them with a newline (to form a group of fields):

type Conn struct {
  TestDial nltest.Func
  // …
  entities map[int]Entity

  messagesMu sync.Mutex
  messages []netlink.Message
}

messages []netlink.Message
err error
}

// Flush sends all buffered commands in a single batch to nftables.
func (cc *Conn) Flush() error {
cc.Lock()
defer func() {
cc.messages = nil
cc.entities = nil
cc.Unlock()
}()
if len(cc.messages) == 0 {
Expand All @@ -59,23 +66,107 @@ func (cc *Conn) Flush() error {

defer conn.Close()

if _, err := conn.SendMessages(batch(cc.messages)); err != nil {
cc.endBatch(cc.messages)

if _, err = conn.SendMessages(cc.messages); err != nil {
return fmt.Errorf("SendMessages: %w", err)
}

if _, err := conn.Receive(); err != nil {
return fmt.Errorf("Receive: %w", err)
// Retrieving of seq number associated to entities
entitiesBySeq := make(map[uint32]Entity)
for i, e := range cc.entities {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the separate entites book-keeping, why not just iterate over cc.messages and check if the type implements HandleResponse?

for _, msg := range cc.messages {
  m, ok := msg.(interface { HandleResponse(netlink.Message) })
  if !ok {
    continue
  }
  m.HandleResponse(rmsg)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandleResponse is not a method that belong to netlink.Message. It's belong to entity.
For a given entity, it's perform some operations on it by using the netlink response. We have to connect an entity (like a Rule) and a netlink message in case of the slice of responses is unordered. I connect them with the netlink sequence.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I’m still not particularly happy about having to manage messages and entities, which are separate but connected.

Instead of the current approach, we could make messages take an interface type which has a NetlinkMessage method. putMessages would then wrap a netlink.Message in a type which implements NetlinkMessage, and can optionally implement HandleResponse as well.

Copy link
Contributor Author

@alexispires alexispires Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stapelberg I'm not sure to understand what do you have in mind but the main limit I have is that the SendMessages method of netlink library take a slice of netlink.Message and not a slice of netlink.Message's pointer. So the only data struct that give me the netlink sequence number is the slice of netlink.Message returned by sendMessages. So the slice of netlink.Message have to be connected in one way or another to something else. In your solution I have to connect the type that wrap netlink.Message with the netlink.Message that contains the seq even if both are the same netlink message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a little confused now: you’re saying the sequence numbers are only set in the []netlink.Message that’s returned by (*netlink.Conn).SendMessages, but in your PR, you’re discarding that result:

if _, err = conn.SendMessages(cc.messages); err != nil {

Is that a bug in your PR, or am I misunderstanding?

(Independently, related to your question: the fact that the netlink package takes a []netlink.Message instead of []*netlink.Message doesn’t limit us in which data structures we want to use. If need be, we can do a copy from one to the other. A netlink.Message’s Header is just a few ints, and copying the Body byte slice won’t need to copy its contents.)

entitiesBySeq[cc.messages[i].Header.Sequence] = e
}

// Trigger entities callback
msg, err := cc.checkReceive(conn)
alexispires marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call checkReceive instead of just calling conn.Receive? It’s not clear to me whether you’re blocking until the next message is available, or only reading as long as messages are available. If the latter, that doesn’t seem sound to me—instead, we should read blockingly until we can identify the end of the response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to read as long as message are available. I'm not sure how to detect the end of the response without select.

Here an example of batch with echo flag:

sendmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=00000000}, msg_namelen=12, msg_iov=[{iov_base=[{{len=20, type=NFNL_MSG_BATCH_BEGIN, flags=NLM_F_REQUEST, seq=3, pid=0}, "\x00\x00\x0a\x00"}, {{len=36, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWTABLE, flags=NLM_F_REQUEST|NLM_F_ECHO, seq=4, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x08\x00\x02\x00\x00\x00\x00\x00"}, {{len=84, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=5, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x0f\x00\x03\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x14\x00\x04\x80\x08\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x00\x00\x00\x00\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00"}, {{len=84, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=6, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x10\x00\x03\x00\x70\x6f\x73\x74\x72\x6f\x75\x74\x69\x6e\x67\x00\x14\x00\x04\x80\x08\x00\x01\x00\x00\x00\x00\x04\x08\x00\x02\x00\x00\x00\x00\x64\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00"}, {{len=96, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSET, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=7, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x0c\x00\x02\x00\x5f\x5f\x73\x65\x74\x25\x64\x00\x08\x00\x03\x00\x00\x00\x00\x03\x08\x00\x04\x00\x00\x00\x00\x07\x08\x00\x05\x00\x00\x00\x00\x04\x08\x00\x0a\x00\x00\x00\x00\x04\x0c\x00\x09\x80\x08\x00\x01\x00\x00\x00\x00\x04\x0a\x00\x0d\x00\x00\x04\x02\x00\x00\x00\x00\x00"}, {{len=116, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=7, pid=0}, "\x02\x00\x00\x00\x0c\x00\x02\x00\x5f\x5f\x73\x65\x74\x25\x64\x00\x08\x00\x04\x00\x00\x00\x00\x04\x08\x00\x01\x00\x6e\x61\x74\x00\x44\x00\x03\x80\x10\x00\x01\x80\x0c\x00\x01\x80\x08\x00\x01\x00\x54\x4d\x28\x84\x10\x00\x02\x80\x0c\x00\x01\x80\x08\x00\x01\x00\xb0\x1f\x35\x63\x10\x00\x03\x80\x0c\x00\x01\x80\x08\x00\x01\x00\x51\x13\x60\x94\x10\x00\x04\x80\x0c\x00\x01\x80\x08\x00\x01\x00\x8a\x64\x3e\x08"}, {{len=168, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWRULE, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_EXCL|NLM_F_CREATE|NLM_F_APPEND, seq=8, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x0f\x00\x02\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x7c\x00\x04\x80\x34\x00\x01\x80\x0c\x00\x01\x00\x70\x61\x79\x6c\x6f\x61\x64\x00\x24\x00\x02\x80\x08\x00\x01\x00\x00\x00\x00\x01\x08\x00\x02\x00\x00\x00\x00\x01\x08\x00\x03\x00\x00\x00\x00\x0c\x08\x00\x04\x00\x00\x00\x00\x04\x30\x00\x01\x80\x0b\x00\x01\x00\x6c\x6f\x6f\x6b\x75\x70\x00\x00\x20\x00\x02\x80\x08\x00\x02\x00\x00\x00\x00\x01\x0c\x00\x01\x00\x5f\x5f\x73\x65\x74\x25\x64\x00\x08\x00\x04\x00\x00\x00\x00\x04\x14\x00\x01\x80\x0c\x00\x01\x00\x63\x6f\x75\x6e\x74\x65\x72\x00\x04\x00\x02\x80"}, {{len=20, type=NFNL_MSG_BATCH_END, flags=NLM_F_REQUEST, seq=9, pid=0}, "\x00\x00\x0a\x00"}], iov_len=624}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 624
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=104, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=0, seq=5, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0c\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x01\x0f\x00\x03\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x14\x00\x04\x00\x08\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x00\x00\x00\x00\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00\x08\x00\x06\x00\x00\x00\x00\x04"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 104
fstat(1, {st_dev=makedev(0, 22), st_ino=3, st_mode=S_IFCHR|0620, st_nlink=1, st_uid=1000, st_gid=5, st_blksize=1024, st_blocks=0, st_rdev=makedev(136, 0), st_atime=1579772816 /* 2020-01-23T10:46:56.597292734+0100 */, st_atime_nsec=597292734, st_mtime=1579772816 /* 2020-01-23T10:46:56.597292734+0100 */, st_mtime_nsec=597292734, st_ctime=1579772494 /* 2020-01-23T10:41:34.597292734+0100 */, st_ctime_nsec=597292734}) = 0
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=104, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=0, seq=6, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0c\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10\x00\x03\x00\x70\x6f\x73\x74\x72\x6f\x75\x74\x69\x6e\x67\x00\x14\x00\x04\x00\x08\x00\x01\x00\x00\x00\x00\x04\x08\x00\x02\x00\x00\x00\x00\x64\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00\x08\x00\x06\x00\x00\x00\x00\x00"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 104
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=100, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSET, flags=0, seq=7, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x0c\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x09\x08\x00\x03\x00\x00\x00\x00\x03\x08\x00\x04\x00\x00\x00\x00\x07\x08\x00\x05\x00\x00\x00\x00\x04\x0a\x00\x0d\x00\x00\x04\x02\x00\x00\x00\x00\x00\x0c\x00\x09\x00\x08\x00\x01\x00\x00\x00\x00\x04"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 100
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\x54\x4d\x28\x84"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\xb0\x1f\x35\x63"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\x51\x13\x60\x94"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\x8a\x64\x3e\x08"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=204, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWRULE, flags=0, seq=8, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0f\x00\x02\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x0c\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x0a\x94\x00\x04\x00\x34\x00\x01\x00\x0c\x00\x01\x00\x70\x61\x79\x6c\x6f\x61\x64\x00\x24\x00\x02\x00\x08\x00\x01\x00\x00\x00\x00\x01\x08\x00\x02\x00\x00\x00\x00\x01\x08\x00\x03\x00\x00\x00\x00\x0c\x08\x00\x04\x00\x00\x00\x00\x04\x30\x00\x01\x00\x0b\x00\x01\x00\x6c\x6f\x6f\x6b\x75\x70\x00\x00\x20\x00\x02\x00\x0b\x00\x01\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x08\x00\x02\x00\x00\x00\x00\x01\x08\x00\x05\x00\x00\x00\x00\x00\x2c\x00\x01\x00\x0c\x00\x01\x00\x63\x6f\x75\x6e\x74\x65\x72\x00\x1c\x00\x02\x00\x0c\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 204
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 0 (Timeout)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about setting NLM_F_ACK or NLM_F_ECHO on the batch start and batch end messages? That way, you should be able to read until you encounter the sequence number for the batch end message.

if err != nil {
return err
}

for msg {
rmsg, err := conn.Receive()
if err != nil {
return fmt.Errorf("Receive: %w", err)
}

for _, msg := range rmsg {
if e, ok := entitiesBySeq[msg.Header.Sequence]; ok {
e.HandleResponse(msg)

}
}
msg, err = cc.checkReceive(conn)
if err != nil {
return err
}
}

return err
}

// putMessage store netlink message to sent after
func (cc *Conn) putMessage(msg netlink.Message) int {
cc.messagesMu.Lock()
defer cc.messagesMu.Unlock()

if cc.messages == nil {
cc.messages = append(cc.messages, netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
})
}

cc.messages = append(cc.messages, msg)

return len(cc.messages) - 1
}

// PutEntity store entity to relate to netlink response
func (cc *Conn) PutEntity(i int, e Entity) {
if cc.entities == nil {
cc.entities = make(map[int]Entity)
}
cc.entities[i] = e
}

func (cc *Conn) checkReceive(c *netlink.Conn) (bool, error) {
if cc.TestDial != nil {
return false, nil
}

sc, err := c.SyscallConn()
alexispires marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return false, fmt.Errorf("SyscallConn error: %w", err)
}

var n int

sc.Control(func(fd uintptr) {
alexispires marked this conversation as resolved.
Show resolved Hide resolved
var fdSet unix.FdSet
fdSet.Zero()
fdSet.Set(int(fd))

n, err = unix.Select(int(fd)+1, &fdSet, nil, nil, &unix.Timeval{})
})

if err == nil && n > 0 {
return true, nil
}

return nil
return false, err
}

// FlushRuleset flushes the entire ruleset. See also
// https://wiki.nftables.org/wiki-nftables/index.php/Operations_at_ruleset_level
func (cc *Conn) FlushRuleset() {
cc.Lock()
defer cc.Unlock()
cc.messages = append(cc.messages, netlink.Message{
cc.putMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
Expand Down Expand Up @@ -116,26 +207,16 @@ func (cc *Conn) marshalExpr(e expr.Any) []byte {
return b
}

func batch(messages []netlink.Message) []netlink.Message {
batch := []netlink.Message{
{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
},
}
func (cc *Conn) endBatch(messages []netlink.Message) {

batch = append(batch, messages...)
cc.messagesMu.Lock()
defer cc.messagesMu.Unlock()

batch = append(batch, netlink.Message{
cc.messages = append(cc.messages, netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_END),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
})

return batch
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.12

require (
github.com/koneu/natend v0.0.0-20150829182554-ec0926ea948d
github.com/mdlayher/netlink v0.0.0-20191009155606-de872b0d824b
github.com/mdlayher/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc
golang.org/x/net v0.0.0-20191028085509-fe3aa8a45271 // indirect
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c
golang.org/x/sys v0.0.0-20200106114638-5f8ca72cd632
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/koneu/natend v0.0.0-20150829182554-ec0926ea948d/go.mod h1:QHb4k4cr1fQ
github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA=
github.com/mdlayher/netlink v0.0.0-20191009155606-de872b0d824b h1:W3er9pI7mt2gOqOWzwvx20iJ8Akiqz1mUMTxU6wdvl8=
github.com/mdlayher/netlink v0.0.0-20191009155606-de872b0d824b/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M=
github.com/mdlayher/netlink v1.0.0 h1:vySPY5Oxnn/8lxAPn2cK6kAzcZzYJl3KriSLO46OT18=
github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M=
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc h1:R83G5ikgLMxrBvLh22JhdfI8K6YXEPHx5P03Uu3DRs4=
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -25,4 +27,6 @@ golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c h1:S/FtSvpNLtFBgjTqcKsRpsa6a
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea h1:Mz1TMnfJDRJLk8S8OPCoJYgrsp/Se/2TBre2+vwX128=
golang.org/x/sys v0.0.0-20191113150313-8ad342257130 h1:+sdNBpwFF05NvMnEyGynbOs/Gr2LQwORWEPKXuEXxzU=
golang.org/x/sys v0.0.0-20200106114638-5f8ca72cd632 h1:ateQkYCVYo8UwIBvoR3zj1Dh2K6Op/n3GxemXfB44/Y=
golang.org/x/sys v0.0.0-20200106114638-5f8ca72cd632/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
82 changes: 82 additions & 0 deletions nftables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"testing"

"github.com/google/nftables"
Expand Down Expand Up @@ -3980,3 +3981,84 @@ func TestStatelessNAT(t *testing.T) {
t.Fatal(err)
}
}

func TestIntegrationAddRule(t *testing.T) {

// Create a new network namespace to test these operations,
// and tear down the namespace at test completion.
c, newNS := openSystemNFTConn(t)
defer cleanupSystemNFTConn(t, newNS)
// Clear all rules at the beginning + end of the test.
c.FlushRuleset()
defer c.FlushRuleset()

filter := c.AddTable(&nftables.Table{
Family: nftables.TableFamilyIPv4,
Name: "filter",
})

chain := c.AddChain(&nftables.Chain{
Name: "chain",
Table: filter,
Type: nftables.ChainTypeFilter,
Hooknum: nftables.ChainHookPrerouting,
Priority: nftables.ChainPriorityFilter,
})

c.Flush()

execN := func(w int, n int) {
c := &nftables.Conn{NetNS: int(newNS)}

for i := 0; i < n; i++ {

r := c.AddRule(&nftables.Rule{
Table: filter,
Chain: chain,
UserData: []byte(fmt.Sprintf("%d-%d", w, i)),
Exprs: []expr.Any{
&expr.Verdict{
// [ immediate reg 0 drop ]
Kind: expr.VerdictDrop,
},
},
})

if r.Handle != 0 {
t.Fatalf("unexpected handle value at %d", i)
}

if err := c.Flush(); err != nil {
t.Fatal(err)
}

if r.Handle == 0 {
t.Fatalf("handle value is empty at %d", i)
}

rulesGetted, _ := c.GetRule(filter, chain)

for i, rg := range rulesGetted {
if bytes.Equal(rg.UserData, r.UserData) && rg.Handle != r.Handle {
t.Fatalf("mismatched handle at %d-%d, got: %d, want: %d", w, i, r.Handle, rg.Handle)
}
}
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test running many workers concurrently? In other words: why is not sufficient to test with 1 worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test the behaviors with concurrency as exprimed here: #88 (comment)
IMO it's safer to keep it to identify regression on concurent access. But it's not specific on this part of lib, I think concurrency have to be tested on the whole lib.

const (
workers = 16
iterations = 256
)

var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(n int) {
defer wg.Done()
execN(n, iterations)
}(i)
}

wg.Wait()
}
2 changes: 1 addition & 1 deletion obj.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (cc *Conn) AddObj(o Obj) Obj {
return nil
}

cc.messages = append(cc.messages, netlink.Message{
cc.putMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWOBJ),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
Expand Down
19 changes: 16 additions & 3 deletions rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,16 @@ func (cc *Conn) AddRule(r *Rule) *Rule {
flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO | unix.NLM_F_APPEND
}

cc.messages = append(cc.messages, netlink.Message{
m := netlink.Message{
Header: netlink.Header{
Type: ruleHeaderType,
Flags: flags,
},
Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
})
}

i := cc.putMessage(m)
cc.PutEntity(i, r)

return r
}
Expand All @@ -149,7 +152,7 @@ func (cc *Conn) DelRule(r *Rule) error {
})...)
flags := netlink.Request | netlink.Acknowledge

cc.messages = append(cc.messages, netlink.Message{
cc.putMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: flags,
Expand All @@ -160,6 +163,16 @@ func (cc *Conn) DelRule(r *Rule) error {
return nil
}

// HandleResponse retrieves Handle in netlink response
func (r *Rule) HandleResponse(msg netlink.Message) {
rule, err := ruleFromMsg(msg)
if err != nil {
return
}

r.Handle = rule.Handle
}

func exprsFromMsg(b []byte) ([]expr.Any, error) {
ad, err := netlink.NewAttributeDecoder(b)
if err != nil {
Expand Down
Loading