blob: 13d1472c9241c3b69b0790d6abc39243889bffc6 [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001From 4f639231c83b09ea004c03e95c702b7750bf9930 Mon Sep 17 00:00:00 2001
2From: Ander Juaristi <a@juaristi.eus>
3Date: Fri, 26 Apr 2019 09:58:06 +0200
4Subject: IPFIX: Add IPFIX output plugin
5
6This patch adds an IPFIX output plugin to ulogd2. It generates NetFlow/IPFIX
7traces and sends them to a remote server (collector) via TCP or UDP.
8
9Based on original work by Holger Eitzenberger <holger@eitzenberger.org>.
10
11How to test this
12----------------
13
14I am currently testing this with the NFCT input and Wireshark.
15
16Place the following in ulogd.conf:
17
18 # this will print all flows on screen
19 loglevel=1
20
21 # load NFCT and IPFIX plugins
22 plugin="/lib/ulogd/ulogd_inpflow_NFCT.so"
23 plugin="/lib/ulogd/ulogd_output_IPFIX.so"
24
25 stack=ct1:NFCT,ipfix1:IPFIX
26
27 [ct1]
28 netlink_socket_buffer_size=217088
29 netlink_socket_buffer_maxsize=1085440
30 accept_proto_filter=tcp,sctp
31
32 [ipfix1]
33 oid=1
34 host="127.0.0.1"
35 #port=4739
36 #send_template="once"
37
38I am currently testing it by launching a plain NetCat listener on port
394739 (the default for IPFIX) and then running Wireshark and see that it
40dissects the IPFIX/NetFlow traffic correctly (obviously this relies on
41the Wireshark NetFlow dissector being correct).
42
43First:
44
45 nc -vvvv -l 127.0.0.1 4739
46
47Then:
48
49 sudo ulogd -vc ulogd.conf
50
51Signed-off-by: Ander Juaristi <a@juaristi.eus>
52Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
53---
54 configure.ac | 2 +-
55 include/ulogd/ulogd.h | 5 +
56 input/flow/ulogd_inpflow_IPFIX.c | 2 -
57 output/Makefile.am | 2 +-
58 output/ipfix/Makefile.am | 7 +
59 output/ipfix/ipfix.c | 141 ++++++++++
60 output/ipfix/ipfix.h | 89 +++++++
61 output/ipfix/ulogd_output_IPFIX.c | 503 +++++++++++++++++++++++++++++++++++
62 output/ulogd_output_IPFIX.c | 546 --------------------------------------
63 9 files changed, 747 insertions(+), 550 deletions(-)
64 delete mode 100644 input/flow/ulogd_inpflow_IPFIX.c
65 create mode 100644 output/ipfix/Makefile.am
66 create mode 100644 output/ipfix/ipfix.c
67 create mode 100644 output/ipfix/ipfix.h
68 create mode 100644 output/ipfix/ulogd_output_IPFIX.c
69 delete mode 100644 output/ulogd_output_IPFIX.c
70
71--- a/configure.ac
72+++ b/configure.ac
73@@ -179,7 +179,7 @@ AC_CONFIG_FILES(include/Makefile include
74 input/sum/Makefile \
75 filter/Makefile filter/raw2packet/Makefile filter/packet2flow/Makefile \
76 output/Makefile output/pcap/Makefile output/mysql/Makefile output/pgsql/Makefile output/sqlite3/Makefile \
77- output/dbi/Makefile \
78+ output/dbi/Makefile output/ipfix/Makefile \
79 src/Makefile Makefile Rules.make)
80 AC_OUTPUT
81
82--- a/include/ulogd/ulogd.h
83+++ b/include/ulogd/ulogd.h
84@@ -28,6 +28,11 @@
85
86 /* types without length */
87 #define ULOGD_RET_NONE 0x0000
88+#define __packed __attribute__((packed))
89+#define __noreturn __attribute__((noreturn))
90+#define __cold __attribute__((cold))
91+
92+#define __packed __attribute__((packed))
93
94 #define ULOGD_RET_INT8 0x0001
95 #define ULOGD_RET_INT16 0x0002
96--- a/output/Makefile.am
97+++ b/output/Makefile.am
98@@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include ${
99 ${LIBNETFILTER_CONNTRACK_CFLAGS} ${LIBNETFILTER_LOG_CFLAGS}
100 AM_CFLAGS = ${regular_CFLAGS}
101
102-SUBDIRS= pcap mysql pgsql sqlite3 dbi
103+SUBDIRS= pcap mysql pgsql sqlite3 dbi ipfix
104
105 pkglib_LTLIBRARIES = ulogd_output_LOGEMU.la ulogd_output_SYSLOG.la \
106 ulogd_output_OPRINT.la ulogd_output_GPRINT.la \
107--- /dev/null
108+++ b/output/ipfix/Makefile.am
109@@ -0,0 +1,7 @@
110+AM_CPPFLAGS = -I$(top_srcdir)/include
111+AM_CFLAGS = $(regular_CFLAGS)
112+
113+pkglib_LTLIBRARIES = ulogd_output_IPFIX.la
114+
115+ulogd_output_IPFIX_la_SOURCES = ulogd_output_IPFIX.c ipfix.c
116+ulogd_output_IPFIX_la_LDFLAGS = -avoid-version -module
117--- /dev/null
118+++ b/output/ipfix/ipfix.c
119@@ -0,0 +1,141 @@
120+/*
121+ * ipfix.c
122+ *
123+ * Holger Eitzenberger, 2009.
124+ */
125+
126+/* These forward declarations are needed since ulogd.h doesn't like to be the first */
127+#include <ulogd/linuxlist.h>
128+
129+#define __packed __attribute__((packed))
130+
131+#include "ipfix.h"
132+
133+#include <ulogd/ulogd.h>
134+#include <ulogd/common.h>
135+
136+struct ipfix_msg *ipfix_msg_alloc(size_t len, uint32_t oid)
137+{
138+ struct ipfix_msg *msg;
139+ struct ipfix_hdr *hdr;
140+
141+ if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
142+ return NULL;
143+
144+ msg = malloc(sizeof(struct ipfix_msg) + len);
145+ memset(msg, 0, sizeof(struct ipfix_msg));
146+ msg->tail = msg->data + IPFIX_HDRLEN;
147+ msg->end = msg->data + len;
148+
149+ hdr = ipfix_msg_hdr(msg);
150+ memset(hdr, 0, IPFIX_HDRLEN);
151+ hdr->version = htons(IPFIX_VERSION);
152+ hdr->oid = htonl(oid);
153+
154+ return msg;
155+}
156+
157+void ipfix_msg_free(struct ipfix_msg *msg)
158+{
159+ if (!msg)
160+ return;
161+
162+ if (msg->nrecs > 0)
163+ ulogd_log(ULOGD_DEBUG, "%s: %d flows have been lost\n", __func__,
164+ msg->nrecs);
165+
166+ free(msg);
167+}
168+
169+struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *msg)
170+{
171+ return (struct ipfix_hdr *)msg->data;
172+}
173+
174+void *ipfix_msg_data(struct ipfix_msg *msg)
175+{
176+ return msg->data;
177+}
178+
179+size_t ipfix_msg_len(const struct ipfix_msg *msg)
180+{
181+ return msg->tail - msg->data;
182+}
183+
184+struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *msg, uint16_t sid)
185+{
186+ struct ipfix_set_hdr *shdr;
187+
188+ if (msg->end - msg->tail < (int) IPFIX_SET_HDRLEN)
189+ return NULL;
190+
191+ shdr = (struct ipfix_set_hdr *)msg->tail;
192+ shdr->id = sid;
193+ shdr->len = IPFIX_SET_HDRLEN;
194+ msg->tail += IPFIX_SET_HDRLEN;
195+ msg->last_set = shdr;
196+ return shdr;
197+}
198+
199+struct ipfix_set_hdr *ipfix_msg_get_set(const struct ipfix_msg *msg)
200+{
201+ return msg->last_set;
202+}
203+
204+/**
205+ * Add data record to an IPFIX message. The data is accounted properly.
206+ *
207+ * @return pointer to data or %NULL if not that much space left.
208+ */
209+void *ipfix_msg_add_data(struct ipfix_msg *msg, size_t len)
210+{
211+ void *data;
212+
213+ if (!msg->last_set) {
214+ ulogd_log(ULOGD_FATAL, "msg->last_set is NULL\n");
215+ return NULL;
216+ }
217+
218+ if ((ssize_t) len > msg->end - msg->tail)
219+ return NULL;
220+
221+ data = msg->tail;
222+ msg->tail += len;
223+ msg->nrecs++;
224+ msg->last_set->len += len;
225+
226+ return data;
227+}
228+
229+/* check and dump message */
230+int ipfix_dump_msg(const struct ipfix_msg *msg)
231+{
232+ const struct ipfix_hdr *hdr = ipfix_msg_hdr(msg);
233+ const struct ipfix_set_hdr *shdr = (struct ipfix_set_hdr *) hdr->data;
234+
235+ if (ntohs(hdr->len) < IPFIX_HDRLEN) {
236+ ulogd_log(ULOGD_FATAL, "Invalid IPFIX message header length\n");
237+ return -1;
238+ }
239+ if (ipfix_msg_len(msg) != IPFIX_HDRLEN + ntohs(shdr->len)) {
240+ ulogd_log(ULOGD_FATAL, "Invalid IPFIX message length\n");
241+ return -1;
242+ }
243+
244+ ulogd_log(ULOGD_DEBUG, "msg: ver=%#x len=%#x t=%#x seq=%#x oid=%d\n",
245+ ntohs(hdr->version), ntohs(hdr->len), htonl(hdr->time),
246+ ntohl(hdr->seqno), ntohl(hdr->oid));
247+
248+ return 0;
249+}
250+
251+/* template management */
252+size_t ipfix_rec_len(uint16_t sid)
253+{
254+ if (sid != htons(VY_IPFIX_SID)) {
255+ ulogd_log(ULOGD_FATAL, "Invalid SID\n");
256+ return 0;
257+ }
258+
259+ return sizeof(struct vy_ipfix_data);
260+}
261--- /dev/null
262+++ b/output/ipfix/ipfix.h
263@@ -0,0 +1,89 @@
264+/*
265+ * ipfix.h
266+ *
267+ * Holger Eitzenberger <holger@eitzenberger.org>, 2009.
268+ */
269+#ifndef IPFIX_H
270+#define IPFIX_H
271+
272+#include <stdint.h>
273+#include <netinet/in.h>
274+
275+
276+struct ipfix_hdr {
277+#define IPFIX_VERSION 0xa
278+ uint16_t version;
279+ uint16_t len;
280+ uint32_t time;
281+ uint32_t seqno;
282+ uint32_t oid; /* Observation Domain ID */
283+ uint8_t data[];
284+} __packed;
285+
286+#define IPFIX_HDRLEN sizeof(struct ipfix_hdr)
287+
288+/*
289+ * IDs 0-255 are reserved for Template Sets. IDs of Data Sets are > 255.
290+ */
291+struct ipfix_templ_hdr {
292+ uint16_t id;
293+ uint16_t cnt;
294+ uint8_t data[];
295+} __packed;
296+
297+struct ipfix_set_hdr {
298+#define IPFIX_SET_TEMPL 2
299+#define IPFIX_SET_OPT_TEMPL 3
300+ uint16_t id;
301+ uint16_t len;
302+ uint8_t data[];
303+} __packed;
304+
305+#define IPFIX_SET_HDRLEN sizeof(struct ipfix_set_hdr)
306+
307+struct ipfix_msg {
308+ struct llist_head link;
309+ uint8_t *tail;
310+ uint8_t *end;
311+ unsigned nrecs;
312+ struct ipfix_set_hdr *last_set;
313+ uint8_t data[];
314+};
315+
316+struct vy_ipfix_data {
317+ struct in_addr saddr;
318+ struct in_addr daddr;
319+ uint16_t ifi_in;
320+ uint16_t ifi_out;
321+ uint32_t packets;
322+ uint32_t bytes;
323+ uint32_t start; /* Unix time */
324+ uint32_t end; /* Unix time */
325+ uint16_t sport;
326+ uint16_t dport;
327+ uint32_t aid; /* Application ID */
328+ uint8_t l4_proto;
329+ uint8_t dscp;
330+ uint16_t __padding;
331+} __packed;
332+
333+#define VY_IPFIX_SID 256
334+
335+#define VY_IPFIX_FLOWS 36
336+#define VY_IPFIX_PKT_LEN (IPFIX_HDRLEN + IPFIX_SET_HDRLEN \
337+ + VY_IPFIX_FLOWS * sizeof(struct vy_ipfix_data))
338+
339+/* template management */
340+size_t ipfix_rec_len(uint16_t);
341+
342+/* message handling */
343+struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
344+void ipfix_msg_free(struct ipfix_msg *);
345+struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
346+size_t ipfix_msg_len(const struct ipfix_msg *);
347+void *ipfix_msg_data(struct ipfix_msg *);
348+struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
349+void *ipfix_msg_add_data(struct ipfix_msg *, size_t);
350+int ipfix_dump_msg(const struct ipfix_msg *);
351+
352+#endif /* IPFIX_H */
353--- /dev/null
354+++ b/output/ipfix/ulogd_output_IPFIX.c
355@@ -0,0 +1,503 @@
356+/*
357+ * ulogd_output_IPFIX.c
358+ *
359+ * ulogd IPFIX Exporter plugin.
360+ *
361+ * This program is distributed in the hope that it will be useful,
362+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
363+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
364+ * GNU General Public License for more details.
365+ *
366+ * You should have received a copy of the GNU General Public License
367+ * along with this program; if not, write to the Free Software
368+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
369+ *
370+ * Holger Eitzenberger <holger@eitzenberger.org> Astaro AG 2009
371+ */
372+#include <unistd.h>
373+#include <time.h>
374+#include <sys/types.h>
375+#include <sys/socket.h>
376+#include <arpa/inet.h>
377+#include <netdb.h>
378+#include <ulogd/ulogd.h>
379+#include <ulogd/common.h>
380+
381+#include "ipfix.h"
382+
383+#define DEFAULT_MTU 512 /* RFC 5101, 10.3.3 */
384+#define DEFAULT_PORT 4739 /* RFC 5101, 10.3.4 */
385+#define DEFAULT_SPORT 4740
386+
387+enum {
388+ OID_CE = 0,
389+ HOST_CE,
390+ PORT_CE,
391+ PROTO_CE,
392+ MTU_CE,
393+};
394+
395+#define oid_ce(x) (x->ces[OID_CE])
396+#define host_ce(x) (x->ces[HOST_CE])
397+#define port_ce(x) (x->ces[PORT_CE])
398+#define proto_ce(x) (x->ces[PROTO_CE])
399+#define mtu_ce(x) (x->ces[MTU_CE])
400+
401+static const struct config_keyset ipfix_kset = {
402+ .num_ces = 5,
403+ .ces = {
404+ {
405+ .key = "oid",
406+ .type = CONFIG_TYPE_INT,
407+ .u.value = 0
408+ },
409+ {
410+ .key = "host",
411+ .type = CONFIG_TYPE_STRING,
412+ .u.string = ""
413+ },
414+ {
415+ .key = "port",
416+ .type = CONFIG_TYPE_INT,
417+ .u.value = DEFAULT_PORT
418+ },
419+ {
420+ .key = "proto",
421+ .type = CONFIG_TYPE_STRING,
422+ .u.string = "tcp"
423+ },
424+ {
425+ .key = "mtu",
426+ .type = CONFIG_TYPE_INT,
427+ .u.value = DEFAULT_MTU
428+ }
429+ }
430+};
431+
432+struct ipfix_templ {
433+ struct ipfix_templ *next;
434+};
435+
436+struct ipfix_priv {
437+ struct ulogd_fd ufd;
438+ uint32_t seqno;
439+ struct ipfix_msg *msg; /* current message */
440+ struct llist_head list;
441+ struct ipfix_templ *templates;
442+ int proto;
443+ struct ulogd_timer timer;
444+ struct sockaddr_in sa;
445+};
446+
447+enum {
448+ InIpSaddr = 0,
449+ InIpDaddr,
450+ InRawInPktCount,
451+ InRawInPktLen,
452+ InRawOutPktCount,
453+ InRawOutPktLen,
454+ InFlowStartSec,
455+ InFlowStartUsec,
456+ InFlowEndSec,
457+ InFlowEndUsec,
458+ InL4SPort,
459+ InL4DPort,
460+ InIpProto,
461+ InCtMark
462+};
463+
464+static struct ulogd_key ipfix_in_keys[] = {
465+ [InIpSaddr] = {
466+ .type = ULOGD_RET_IPADDR,
467+ .name = "orig.ip.saddr"
468+ },
469+ [InIpDaddr] = {
470+ .type = ULOGD_RET_IPADDR,
471+ .name = "orig.ip.daddr"
472+ },
473+ [InRawInPktCount] = {
474+ .type = ULOGD_RET_UINT64,
475+ .name = "orig.raw.pktcount"
476+ },
477+ [InRawInPktLen] = {
478+ .type = ULOGD_RET_UINT64,
479+ .name = "orig.raw.pktlen"
480+ },
481+ [InRawOutPktCount] = {
482+ .type = ULOGD_RET_UINT64,
483+ .name = "reply.raw.pktcount"
484+ },
485+ [InRawOutPktLen] = {
486+ .type = ULOGD_RET_UINT64,
487+ .name = "reply.raw.pktlen"
488+ },
489+ [InFlowStartSec] = {
490+ .type = ULOGD_RET_UINT32,
491+ .name = "flow.start.sec"
492+ },
493+ [InFlowStartUsec] = {
494+ .type = ULOGD_RET_UINT32,
495+ .name = "flow.start.usec"
496+ },
497+ [InFlowEndSec] = {
498+ .type = ULOGD_RET_UINT32,
499+ .name = "flow.end.sec"
500+ },
501+ [InFlowEndUsec] = {
502+ .type = ULOGD_RET_UINT32,
503+ .name = "flow.end.usec"
504+ },
505+ [InL4SPort] = {
506+ .type = ULOGD_RET_UINT16,
507+ .name = "orig.l4.sport"
508+ },
509+ [InL4DPort] = {
510+ .type = ULOGD_RET_UINT16,
511+ .name = "orig.l4.dport"
512+ },
513+ [InIpProto] = {
514+ .type = ULOGD_RET_UINT8,
515+ .name = "orig.ip.protocol"
516+ },
517+ [InCtMark] = {
518+ .type = ULOGD_RET_UINT32,
519+ .name = "ct.mark"
520+ }
521+};
522+
523+/* do some polishing and enqueue it */
524+static void enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
525+{
526+ struct ipfix_hdr *hdr = ipfix_msg_data(msg);
527+
528+ if (!msg)
529+ return;
530+
531+ hdr->time = htonl(time(NULL));
532+ hdr->seqno = htonl(priv->seqno += msg->nrecs);
533+ if (msg->last_set) {
534+ msg->last_set->id = htons(msg->last_set->id);
535+ msg->last_set->len = htons(msg->last_set->len);
536+ msg->last_set = NULL;
537+ }
538+ hdr->len = htons(ipfix_msg_len(msg));
539+
540+ llist_add(&msg->link, &priv->list);
541+}
542+
543+/**
544+ * @return %ULOGD_IRET_OK or error value
545+ */
546+static int send_msgs(struct ulogd_pluginstance *pi)
547+{
548+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
549+ struct llist_head *curr, *tmp;
550+ struct ipfix_msg *msg;
551+ int ret = ULOGD_IRET_OK, sent;
552+
553+ llist_for_each_prev(curr, &priv->list) {
554+ msg = llist_entry(curr, struct ipfix_msg, link);
555+
556+ sent = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
557+ if (sent < 0) {
558+ ulogd_log(ULOGD_ERROR, "send: %m\n");
559+ ret = ULOGD_IRET_ERR;
560+ goto done;
561+ }
562+
563+ /* TODO handle short send() for other protocols */
564+ if ((size_t) sent < ipfix_msg_len(msg))
565+ ulogd_log(ULOGD_ERROR, "short send: %d < %d\n",
566+ sent, ipfix_msg_len(msg));
567+ }
568+
569+ llist_for_each_safe(curr, tmp, &priv->list) {
570+ msg = llist_entry(curr, struct ipfix_msg, link);
571+ llist_del(curr);
572+ msg->nrecs = 0;
573+ ipfix_msg_free(msg);
574+ }
575+
576+done:
577+ return ret;
578+}
579+
580+static int ipfix_ufd_cb(int fd, unsigned what, void *arg)
581+{
582+ struct ulogd_pluginstance *pi = arg;
583+ struct ipfix_priv *priv = (struct ipfix_priv *) pi->private;
584+ ssize_t nread;
585+ char buf[16];
586+
587+ if (what & ULOGD_FD_READ) {
588+ nread = recv(priv->ufd.fd, buf, sizeof(buf), MSG_DONTWAIT);
589+ if (nread < 0) {
590+ ulogd_log(ULOGD_ERROR, "recv: %m\n");
591+ } else if (!nread) {
592+ ulogd_log(ULOGD_INFO, "connection reset by peer\n");
593+ ulogd_unregister_fd(&priv->ufd);
594+ } else
595+ ulogd_log(ULOGD_INFO, "unexpected data (%d bytes)\n", nread);
596+ }
597+
598+ return 0;
599+}
600+
601+static void ipfix_timer_cb(struct ulogd_timer *t, void *data)
602+{
603+ struct ulogd_pluginstance *pi = data;
604+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
605+
606+ if (priv->msg && priv->msg->nrecs > 0) {
607+ enqueue_msg(priv, priv->msg);
608+ priv->msg = NULL;
609+ send_msgs(pi);
610+ }
611+}
612+
613+static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginstance_stack *stack)
614+{
615+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
616+ int oid, port, mtu, ret;
617+ char *host, *proto;
618+ char addr[16];
619+
620+ ret = config_parse_file(pi->id, pi->config_kset);
621+ if (ret < 0)
622+ return ret;
623+
624+ oid = oid_ce(pi->config_kset).u.value;
625+ host = host_ce(pi->config_kset).u.string;
626+ port = port_ce(pi->config_kset).u.value;
627+ proto = proto_ce(pi->config_kset).u.string;
628+ mtu = mtu_ce(pi->config_kset).u.value;
629+
630+ if (!oid) {
631+ ulogd_log(ULOGD_FATAL, "invalid Observation ID\n");
632+ return ULOGD_IRET_ERR;
633+ }
634+ if (!host || !strcmp(host, "")) {
635+ ulogd_log(ULOGD_FATAL, "no destination host specified\n");
636+ return ULOGD_IRET_ERR;
637+ }
638+
639+ if (!strcmp(proto, "udp")) {
640+ priv->proto = IPPROTO_UDP;
641+ } else if (!strcmp(proto, "tcp")) {
642+ priv->proto = IPPROTO_TCP;
643+ } else {
644+ ulogd_log(ULOGD_FATAL, "unsupported protocol '%s'\n", proto);
645+ return ULOGD_IRET_ERR;
646+ }
647+
648+ memset(&priv->sa, 0, sizeof(priv->sa));
649+ priv->sa.sin_family = AF_INET;
650+ priv->sa.sin_port = htons(port);
651+ ret = inet_pton(AF_INET, host, &priv->sa.sin_addr);
652+ if (ret <= 0) {
653+ ulogd_log(ULOGD_FATAL, "inet_pton: %m\n");
654+ return ULOGD_IRET_ERR;
655+ }
656+
657+ INIT_LLIST_HEAD(&priv->list);
658+
659+ ulogd_init_timer(&priv->timer, pi, ipfix_timer_cb);
660+
661+ ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
662+ inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
663+ port, mtu);
664+
665+ return ULOGD_IRET_OK;
666+}
667+
668+static int tcp_connect(struct ulogd_pluginstance *pi)
669+{
670+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
671+ int ret = ULOGD_IRET_ERR;
672+
673+ if ((priv->ufd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
674+ ulogd_log(ULOGD_FATAL, "socket: %m\n");
675+ return ULOGD_IRET_ERR;
676+ }
677+
678+ if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
679+ ulogd_log(ULOGD_ERROR, "connect: %m\n");
680+ ret = ULOGD_IRET_ERR;
681+ goto err_close;
682+ }
683+
684+ return ULOGD_IRET_OK;
685+
686+err_close:
687+ close(priv->ufd.fd);
688+ return ret;
689+}
690+
691+static int udp_connect(struct ulogd_pluginstance *pi)
692+{
693+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
694+
695+ if ((priv->ufd.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
696+ ulogd_log(ULOGD_FATAL, "socket: %m\n");
697+ return ULOGD_IRET_ERR;
698+ }
699+
700+ if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
701+ ulogd_log(ULOGD_ERROR, "connect: %m\n");
702+ return ULOGD_IRET_ERR;
703+ }
704+
705+ return 0;
706+}
707+
708+static int ipfix_start(struct ulogd_pluginstance *pi)
709+{
710+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
711+ char addr[16];
712+ int port, ret;
713+
714+ switch (priv->proto) {
715+ case IPPROTO_UDP:
716+ if ((ret = udp_connect(pi)) < 0)
717+ return ret;
718+ break;
719+ case IPPROTO_TCP:
720+ if ((ret = tcp_connect(pi)) < 0)
721+ return ret;
722+ break;
723+
724+ default:
725+ break;
726+ }
727+
728+ priv->seqno = 0;
729+
730+ port = port_ce(pi->config_kset).u.value;
731+ ulogd_log(ULOGD_INFO, "connected to %s:%d\n",
732+ inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
733+ port);
734+
735+ /* Register the socket FD */
736+ priv->ufd.when = ULOGD_FD_READ;
737+ priv->ufd.cb = ipfix_ufd_cb;
738+ priv->ufd.data = pi;
739+
740+ if (ulogd_register_fd(&priv->ufd) < 0)
741+ return ULOGD_IRET_ERR;
742+
743+ /* Add a 1 second timer */
744+ ulogd_add_timer(&priv->timer, 1);
745+
746+ return ULOGD_IRET_OK;
747+}
748+
749+static int ipfix_stop(struct ulogd_pluginstance *pi)
750+{
751+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
752+
753+ ulogd_unregister_fd(&priv->ufd);
754+ close(priv->ufd.fd);
755+ priv->ufd.fd = -1;
756+
757+ ulogd_del_timer(&priv->timer);
758+
759+ ipfix_msg_free(priv->msg);
760+ priv->msg = NULL;
761+
762+ return 0;
763+}
764+
765+static int ipfix_interp(struct ulogd_pluginstance *pi)
766+{
767+ struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
768+ struct vy_ipfix_data *data;
769+ int oid, mtu, ret;
770+ char addr[16];
771+
772+ if (!(GET_FLAGS(pi->input.keys, InIpSaddr) & ULOGD_RETF_VALID))
773+ return ULOGD_IRET_OK;
774+
775+ oid = oid_ce(pi->config_kset).u.value;
776+ mtu = mtu_ce(pi->config_kset).u.value;
777+
778+again:
779+ if (!priv->msg) {
780+ priv->msg = ipfix_msg_alloc(mtu, oid);
781+ if (!priv->msg) {
782+ /* just drop this flow */
783+ ulogd_log(ULOGD_ERROR, "out of memory, dropping flow\n");
784+ return ULOGD_IRET_OK;
785+ }
786+ ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
787+ }
788+
789+ data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
790+ if (!data) {
791+ enqueue_msg(priv, priv->msg);
792+ priv->msg = NULL;
793+ /* can't loop because the next will definitely succeed */
794+ goto again;
795+ }
796+
797+ data->ifi_in = data->ifi_out = 0;
798+
799+ data->saddr.s_addr = ikey_get_u32(&pi->input.keys[InIpSaddr]);
800+ data->daddr.s_addr = ikey_get_u32(&pi->input.keys[InIpDaddr]);
801+
802+ data->packets = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktCount])
803+ + ikey_get_u64(&pi->input.keys[InRawOutPktCount])));
804+ data->bytes = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktLen])
805+ + ikey_get_u64(&pi->input.keys[InRawOutPktLen])));
806+
807+ data->start = htonl(ikey_get_u32(&pi->input.keys[InFlowStartSec]));
808+ data->end = htonl(ikey_get_u32(&pi->input.keys[InFlowEndSec]));
809+
810+ if (GET_FLAGS(pi->input.keys, InL4SPort) & ULOGD_RETF_VALID) {
811+ data->sport = htons(ikey_get_u16(&pi->input.keys[InL4SPort]));
812+ data->dport = htons(ikey_get_u16(&pi->input.keys[InL4DPort]));
813+ }
814+
815+ data->aid = 0;
816+ if (GET_FLAGS(pi->input.keys, InCtMark) & ULOGD_RETF_VALID)
817+ data->aid = htonl(ikey_get_u32(&pi->input.keys[InCtMark]));
818+
819+ data->l4_proto = ikey_get_u8(&pi->input.keys[InIpProto]);
820+ data->__padding = 0;
821+
822+ ulogd_log(ULOGD_DEBUG, "Got new packet (packets = %u, bytes = %u, flow = (%u, %u), saddr = %s, daddr = %s, sport = %u, dport = %u)\n",
823+ ntohl(data->packets), ntohl(data->bytes), ntohl(data->start), ntohl(data->end),
824+ inet_ntop(AF_INET, &data->saddr.s_addr, addr, sizeof(addr)),
825+ inet_ntop(AF_INET, &data->daddr.s_addr, addr, sizeof(addr)),
826+ ntohs(data->sport), ntohs(data->dport));
827+
828+ if ((ret = send_msgs(pi)) < 0)
829+ return ret;
830+
831+ return ULOGD_IRET_OK;
832+}
833+
834+static struct ulogd_plugin ipfix_plugin = {
835+ .name = "IPFIX",
836+ .input = {
837+ .keys = ipfix_in_keys,
838+ .num_keys = ARRAY_SIZE(ipfix_in_keys),
839+ .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW | ULOGD_DTYPE_SUM
840+ },
841+ .output = {
842+ .type = ULOGD_DTYPE_SINK
843+ },
844+ .config_kset = (struct config_keyset *) &ipfix_kset,
845+ .priv_size = sizeof(struct ipfix_priv),
846+ .configure = ipfix_configure,
847+ .start = ipfix_start,
848+ .stop = ipfix_stop,
849+ .interp = ipfix_interp,
850+ .version = VERSION,
851+};
852+
853+void __attribute__ ((constructor)) init(void);
854+
855+void init(void)
856+{
857+ ulogd_register_plugin(&ipfix_plugin);
858+}