Read conntrack flow statistics

This PR allows populating per-connection packet and byte counts to
ConntrackFlow object when nf_conntrack_acct is enabled.
This commit is contained in:
Yang Guan 2018-03-16 10:57:21 -07:00 committed by Alessandro Boch
parent a2ad57a690
commit 41009d533b
4 changed files with 85 additions and 37 deletions

View File

@ -135,11 +135,13 @@ func (h *Handle) dumpConntrackTable(table ConntrackTableType, family InetFamily)
// http://git.netfilter.org/libnetfilter_conntrack/tree/include/internal/object.h // http://git.netfilter.org/libnetfilter_conntrack/tree/include/internal/object.h
// For the time being, the structure below allows to parse and extract the base information of a flow // For the time being, the structure below allows to parse and extract the base information of a flow
type ipTuple struct { type ipTuple struct {
SrcIP net.IP Bytes uint64
DstIP net.IP DstIP net.IP
Protocol uint8
SrcPort uint16
DstPort uint16 DstPort uint16
Packets uint64
Protocol uint8
SrcIP net.IP
SrcPort uint16
} }
type ConntrackFlow struct { type ConntrackFlow struct {
@ -151,11 +153,12 @@ type ConntrackFlow struct {
func (s *ConntrackFlow) String() string { func (s *ConntrackFlow) String() string {
// conntrack cmd output: // conntrack cmd output:
// udp 17 src=127.0.0.1 dst=127.0.0.1 sport=4001 dport=1234 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=1234 dport=4001 mark=0 // udp 17 src=127.0.0.1 dst=127.0.0.1 sport=4001 dport=1234 packets=5 bytes=532 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=1234 dport=4001 packets=10 bytes=1078 mark=0
return fmt.Sprintf("%s\t%d src=%s dst=%s sport=%d dport=%d\tsrc=%s dst=%s sport=%d dport=%d mark=%d", return fmt.Sprintf("%s\t%d src=%s dst=%s sport=%d dport=%d packets=%d bytes=%d\tsrc=%s dst=%s sport=%d dport=%d packets=%d bytes=%d mark=%d",
nl.L4ProtoMap[s.Forward.Protocol], s.Forward.Protocol, nl.L4ProtoMap[s.Forward.Protocol], s.Forward.Protocol,
s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.SrcPort, s.Forward.DstPort, s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.SrcPort, s.Forward.DstPort, s.Forward.Packets, s.Forward.Bytes,
s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.SrcPort, s.Reverse.DstPort, s.Mark) s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.SrcPort, s.Reverse.DstPort, s.Reverse.Packets, s.Reverse.Bytes,
s.Mark)
} }
// This method parse the ip tuple structure // This method parse the ip tuple structure
@ -220,6 +223,24 @@ func parseBERaw16(r *bytes.Reader, v *uint16) {
binary.Read(r, binary.BigEndian, v) binary.Read(r, binary.BigEndian, v)
} }
func parseBERaw64(r *bytes.Reader, v *uint64) {
binary.Read(r, binary.BigEndian, v)
}
func parseByteAndPacketCounters(r *bytes.Reader) (bytes, packets uint64) {
for i := 0; i < 2; i++ {
switch _, t, _ := parseNfAttrTL(r); t {
case nl.CTA_COUNTERS_BYTES:
parseBERaw64(r, &bytes)
case nl.CTA_COUNTERS_PACKETS:
parseBERaw64(r, &packets)
default:
return
}
}
return
}
func parseRawData(data []byte) *ConntrackFlow { func parseRawData(data []byte) *ConntrackFlow {
s := &ConntrackFlow{} s := &ConntrackFlow{}
var proto uint8 var proto uint8
@ -238,20 +259,23 @@ func parseRawData(data []byte) *ConntrackFlow {
// <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes // <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes
// flow information of the reverse flow // flow information of the reverse flow
for reader.Len() > 0 { for reader.Len() > 0 {
nested, t, l := parseNfAttrTL(reader) if nested, t, l := parseNfAttrTL(reader); nested {
if nested && t == nl.CTA_TUPLE_ORIG { switch t {
if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP { case nl.CTA_TUPLE_ORIG:
proto = parseIpTuple(reader, &s.Forward) if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP {
} proto = parseIpTuple(reader, &s.Forward)
} else if nested && t == nl.CTA_TUPLE_REPLY { }
if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP { case nl.CTA_TUPLE_REPLY:
parseIpTuple(reader, &s.Reverse) if nested, t, _ = parseNfAttrTL(reader); nested && t == nl.CTA_TUPLE_IP {
parseIpTuple(reader, &s.Reverse)
// Got all the useful information stop parsing } else {
break // Header not recognized skip it
} else { reader.Seek(int64(l), seekCurrent)
// Header not recognized skip it }
reader.Seek(int64(l), seekCurrent) case nl.CTA_COUNTERS_ORIG:
s.Forward.Bytes, s.Forward.Packets = parseByteAndPacketCounters(reader)
case nl.CTA_COUNTERS_REPLY:
s.Reverse.Bytes, s.Reverse.Packets = parseByteAndPacketCounters(reader)
} }
} }
} }

View File

@ -104,6 +104,8 @@ func TestConntrackTableList(t *testing.T) {
defer ns.Close() defer ns.Close()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
setUpF(t, "/proc/sys/net/netfilter/nf_conntrack_acct", "1")
// Flush the table to start fresh // Flush the table to start fresh
err := h.ConntrackTableFlush(ConntrackTable) err := h.ConntrackTableFlush(ConntrackTable)
CheckErrorFail(t, err) CheckErrorFail(t, err)
@ -124,6 +126,10 @@ func TestConntrackTableList(t *testing.T) {
(flow.Forward.SrcPort >= 2000 && flow.Forward.SrcPort <= 2005) { (flow.Forward.SrcPort >= 2000 && flow.Forward.SrcPort <= 2005) {
found++ found++
} }
if flow.Forward.Bytes == 0 && flow.Forward.Packets == 0 && flow.Reverse.Bytes == 0 && flow.Reverse.Packets == 0 {
t.Error("No traffic statistics are collected")
}
} }
if found != 5 { if found != 5 {
t.Fatalf("Found only %d flows over 5", found) t.Fatalf("Found only %d flows over 5", found)

View File

@ -66,21 +66,22 @@ func setUpNetlinkTestWithLoopback(t *testing.T) tearDownNetlinkTest {
} }
} }
func setUpF(t *testing.T, path, value string) {
file, err := os.Create(path)
defer file.Close()
if err != nil {
t.Fatalf("Failed to open %s: %s", path, err)
}
file.WriteString(value)
}
func setUpMPLSNetlinkTest(t *testing.T) tearDownNetlinkTest { func setUpMPLSNetlinkTest(t *testing.T) tearDownNetlinkTest {
if _, err := os.Stat("/proc/sys/net/mpls/platform_labels"); err != nil { if _, err := os.Stat("/proc/sys/net/mpls/platform_labels"); err != nil {
t.Skip("Test requires MPLS support.") t.Skip("Test requires MPLS support.")
} }
f := setUpNetlinkTest(t) f := setUpNetlinkTest(t)
setUpF := func(path, value string) { setUpF(t, "/proc/sys/net/mpls/platform_labels", "1024")
file, err := os.Create(path) setUpF(t, "/proc/sys/net/mpls/conf/lo/input", "1")
defer file.Close()
if err != nil {
t.Fatalf("Failed to open %s: %s", path, err)
}
file.WriteString(value)
}
setUpF("/proc/sys/net/mpls/platform_labels", "1024")
setUpF("/proc/sys/net/mpls/conf/lo/input", "1")
return f return f
} }

View File

@ -76,12 +76,14 @@ const (
// __CTA_MAX // __CTA_MAX
// }; // };
const ( const (
CTA_TUPLE_ORIG = 1 CTA_TUPLE_ORIG = 1
CTA_TUPLE_REPLY = 2 CTA_TUPLE_REPLY = 2
CTA_STATUS = 3 CTA_STATUS = 3
CTA_TIMEOUT = 7 CTA_TIMEOUT = 7
CTA_MARK = 8 CTA_MARK = 8
CTA_PROTOINFO = 4 CTA_COUNTERS_ORIG = 9
CTA_COUNTERS_REPLY = 10
CTA_PROTOINFO = 4
) )
// enum ctattr_tuple { // enum ctattr_tuple {
@ -163,6 +165,21 @@ const (
CTA_PROTOINFO_TCP_FLAGS_REPLY = 5 CTA_PROTOINFO_TCP_FLAGS_REPLY = 5
) )
// enum ctattr_counters {
// CTA_COUNTERS_UNSPEC,
// CTA_COUNTERS_PACKETS, /* 64bit counters */
// CTA_COUNTERS_BYTES, /* 64bit counters */
// CTA_COUNTERS32_PACKETS, /* old 32bit counters, unused */
// CTA_COUNTERS32_BYTES, /* old 32bit counters, unused */
// CTA_COUNTERS_PAD,
// __CTA_COUNTERS_M
// };
// #define CTA_COUNTERS_MAX (__CTA_COUNTERS_MAX - 1)
const (
CTA_COUNTERS_PACKETS = 1
CTA_COUNTERS_BYTES = 2
)
// /* General form of address family dependent message. // /* General form of address family dependent message.
// */ // */
// struct nfgenmsg { // struct nfgenmsg {