wprobe: export raw values (n, s, ss) to ipfix collectors for improved measurement...
[openwrt.git] / package / libipfix / patches / 100-openimp_sync.patch
1 --- a/lib/ipfix.c
2 +++ b/lib/ipfix.c
3 @@ -37,6 +37,9 @@ $$LIC$$
4  #ifdef SCTPSUPPORT
5  #include <netinet/sctp.h>
6  #endif
7 +#ifndef NOTHREADS
8 +#include <pthread.h>
9 +#endif
10  #include <fcntl.h>
11  #include <netdb.h>
12  
13 @@ -123,6 +126,18 @@ static uint16_t           g_lasttid;    
14  static ipfix_datarecord_t g_data = { NULL, NULL, 0 }; /* ipfix_export */
15  
16  static ipfix_field_t      *g_ipfix_fields;
17 +#ifndef NOTHREADS
18 +static pthread_mutex_t    g_mutex;
19 +#define mod_lock()        { \
20 +                            if ( pthread_mutex_lock( &g_mutex ) !=0 ) \
21 +                                mlogf( 0, "[ipfix] mutex_lock() failed: %s\n", \
22 +                                       strerror( errno ) ); \
23 +                          }
24 +#define mod_unlock()      {  pthread_mutex_unlock( &g_mutex ); }
25 +#else
26 +#define mod_lock()
27 +#define mod_unlock()
28 +#endif
29  
30  /*----- prototypes -------------------------------------------------------*/
31  
32 @@ -133,6 +148,7 @@ int  _ipfix_send_message( ipfix_t *ifh, 
33                            ipfix_message_t *message );
34  int  _ipfix_write_msghdr( ipfix_t *ifh, ipfix_message_t *msg, iobuf_t *buf );
35  void _ipfix_disconnect( ipfix_collector_t *col );
36 +int  _ipfix_export_flush( ipfix_t *ifh );
37  
38  
39  /* name      : do_writeselect
40 @@ -576,16 +592,18 @@ int ipfix_decode_float( void *in, void *
41  
42  int ipfix_snprint_float( char *str, size_t size, void *data, size_t len )
43  {
44 -    float tmp32;
45 -    double tmp64;
46 +    uint32_t tmp32;
47 +    uint64_t tmp64;
48  
49      switch ( len ) {
50        case 4:
51 -          ipfix_decode_float( data, &tmp32, 4);
52 -          return snprintf( str, size, "%f", tmp32 );
53 +          memcpy( &tmp32, data, len );
54 +          tmp32 = htonl( tmp32 );
55 +          return snprintf( str, size, "%f", (float)tmp32 );
56        case 8:
57 -          ipfix_decode_float( data, &tmp64, 8);
58 -          return snprintf( str, size, "%lf", tmp64);
59 +          memcpy( &tmp64, data, len );
60 +          tmp64 = HTONLL( tmp64 );
61 +          return snprintf( str, size, "%lf", (double)tmp64 );
62        default:
63            break;
64      }
65 @@ -682,12 +700,19 @@ int ipfix_get_eno_ieid( char *field, int
66   * parameters:
67   * remarks:     init module, read field type info.
68   */
69 -int ipfix_init ( void )
70 +int ipfix_init( void )
71  {
72      if ( g_tstart ) {
73          ipfix_cleanup();
74      }
75  
76 +#ifndef NOTHREADS
77 +    if ( pthread_mutex_init( &g_mutex, NULL ) !=0 ) {
78 +        mlogf( 0, "[ipfix] pthread_mutex_init() failed: %s\n",
79 +               strerror(errno) );
80 +        return -1;
81 +    }
82 +#endif
83      g_tstart = time(NULL);
84      signal( SIGPIPE, SIG_IGN );
85      g_lasttid = 255;
86 @@ -806,6 +831,9 @@ void ipfix_cleanup ( void )
87      g_data.maxfields = 0;
88      g_data.lens  = NULL;
89      g_data.addrs = NULL;
90 +#ifndef NOTHREADS
91 +    (void)pthread_mutex_destroy( &g_mutex );
92 +#endif
93  }
94  
95  int _ipfix_connect ( ipfix_collector_t *col )
96 @@ -1465,7 +1493,7 @@ int _ipfix_write_template( ipfix_t      
97        default:
98            /* check space */
99            if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN ) {
100 -              if ( ipfix_export_flush( ifh ) < 0 )
101 +              if ( _ipfix_export_flush( ifh ) < 0 )
102                    return -1;
103                if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN )
104                    return -1;
105 @@ -1474,6 +1502,8 @@ int _ipfix_write_template( ipfix_t      
106            /* write template prior to data */
107            if ( ifh->offset > 0 ) {
108                memmove( ifh->buffer + tsize, ifh->buffer, ifh->offset );
109 +              if ( ifh->cs_tid )
110 +                  ifh->cs_header += tsize;
111            }
112  
113            buf = ifh->buffer;
114 @@ -1615,8 +1645,11 @@ int ipfix_open( ipfix_t **ipfixh, int so
115          return -1;
116      }
117      node->ifh   = i;
118 +
119 +    mod_lock();
120      node->next  = g_ipfixlist;
121      g_ipfixlist = node;
122 +    mod_unlock();
123  
124      *ipfixh = i;
125      return 0;
126 @@ -1633,7 +1666,8 @@ void ipfix_close( ipfix_t *h )
127      {
128          ipfix_node_t *l, *n;
129  
130 -        ipfix_export_flush( h );
131 +        mod_lock();
132 +        _ipfix_export_flush( h );
133  
134          while( h->collectors )
135              _ipfix_drop_collector( (ipfix_collector_t**)&h->collectors );
136 @@ -1659,6 +1693,7 @@ void ipfix_close( ipfix_t *h )
137  #endif
138          free(h->buffer);
139          free(h);
140 +        mod_unlock();
141      }
142  }
143  
144 @@ -2156,6 +2191,22 @@ void ipfix_release_template( ipfix_t *if
145      ipfix_delete_template( ifh, templ );
146  }
147  
148 +static void _finish_cs( ipfix_t *ifh )
149 +{
150 +    size_t   buflen;
151 +    uint8_t  *buf;
152 +
153 +    /* finish current dataset */
154 +    if ( (buf=ifh->cs_header) ==NULL )
155 +        return;
156 +    buflen = 0;
157 +    INSERTU16( buf+buflen, buflen, ifh->cs_tid );
158 +    INSERTU16( buf+buflen, buflen, ifh->cs_bytes );
159 +    ifh->cs_bytes = 0;
160 +    ifh->cs_header = NULL;
161 +    ifh->cs_tid = 0;
162 +}
163 +
164  int ipfix_export( ipfix_t *ifh, ipfix_template_t *templ, ... )
165  {
166      int       i;
167 @@ -2199,13 +2250,14 @@ int ipfix_export( ipfix_t *ifh, ipfix_te
168                                 g_data.addrs, g_data.lens );
169  }
170  
171 -int ipfix_export_array( ipfix_t          *ifh,
172 -                        ipfix_template_t *templ,
173 -                        int              nfields,
174 -                        void             **fields,
175 -                        uint16_t         *lengths )
176 +static int
177 +_ipfix_export_array( ipfix_t          *ifh,
178 +                     ipfix_template_t *templ,
179 +                     int              nfields,
180 +                     void             **fields,
181 +                     uint16_t         *lengths )
182  {
183 -    int               i;
184 +    int               i, newset_f=0;
185      size_t            buflen, datasetlen;
186      uint8_t           *p, *buf;
187  
188 @@ -2249,7 +2301,19 @@ int ipfix_export_array( ipfix_t         
189  
190      /** get size of data set, check space
191       */
192 -    for ( i=0, datasetlen=4; i<nfields; i++ ) {
193 +    if ( templ->tid == ifh->cs_tid ) {
194 +        newset_f = 0;
195 +        datasetlen = 0;
196 +    }
197 +    else {
198 +        if ( ifh->cs_tid > 0 ) {
199 +            _finish_cs( ifh );
200 +        }
201 +        newset_f = 1;
202 +        datasetlen = 4;
203 +    }
204 +
205 +    for ( i=0; i<nfields; i++ ) {
206          if ( templ->fields[i].flength == IPFIX_FT_VARLEN ) {
207              if ( lengths[i]>254 )
208                  datasetlen += 3;
209 @@ -2263,21 +2327,29 @@ int ipfix_export_array( ipfix_t         
210          }
211          datasetlen += lengths[i];
212      }
213 -    if ( ((ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN )
214 -         && (ipfix_export_flush( ifh ) <0) ) {
215 -        return -1;
216 +
217 +    if ( (ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN ) {
218 +        if ( ifh->cs_tid )
219 +            _finish_cs( ifh );
220 +        newset_f = 1;
221 +
222 +        if ( _ipfix_export_flush( ifh ) <0 )
223 +            return -1;
224      }
225  
226 -    /* fill buffer
227 -     */
228 +    /* fill buffer */
229      buf    = (uint8_t*)(ifh->buffer) + ifh->offset;
230      buflen = 0;
231  
232 -    /* insert data set
233 -     */
234 -    ifh->nrecords ++;
235 -    INSERTU16( buf+buflen, buflen, templ->tid );
236 -    INSERTU16( buf+buflen, buflen, datasetlen );
237 +    if ( newset_f ) {
238 +        /* insert data set
239 +         */
240 +        ifh->cs_bytes = 0;
241 +        ifh->cs_header = buf;
242 +        ifh->cs_tid = templ->tid;
243 +        INSERTU16( buf+buflen, buflen, templ->tid );
244 +        INSERTU16( buf+buflen, buflen, 4 );
245 +    }
246  
247      /* insert data record
248       */
249 @@ -2303,7 +2375,9 @@ int ipfix_export_array( ipfix_t         
250          buflen += lengths[i];
251      }
252  
253 +    ifh->nrecords ++;
254      ifh->offset += buflen;
255 +    ifh->cs_bytes += buflen;
256      if ( ifh->version == IPFIX_VERSION )
257          ifh->seqno ++;
258      return 0;
259 @@ -2313,7 +2387,7 @@ int ipfix_export_array( ipfix_t         
260   * parameters:
261   * remarks:     rewrite this func!
262   */
263 -int ipfix_export_flush( ipfix_t *ifh )
264 +int _ipfix_export_flush( ipfix_t *ifh )
265  {
266      iobuf_t           *buf;
267      ipfix_collector_t *col;
268 @@ -2322,8 +2396,14 @@ int ipfix_export_flush( ipfix_t *ifh )
269      if ( (ifh==NULL) || (ifh->offset==0) )
270          return 0;
271  
272 -    if ( (buf=_ipfix_getbuf()) ==NULL )
273 +    if ( ifh->cs_tid > 0 ) {
274 +        /* finish current dataset */
275 +        _finish_cs( ifh );
276 +    }
277 +
278 +    if ( (buf=_ipfix_getbuf()) ==NULL ) {
279          return -1;
280 +    }
281  
282  #ifdef DEBUG
283      mlogf( 0, "[ipfix_export_flush] msg has %d records, %d bytes\n",
284 @@ -2350,3 +2430,30 @@ int ipfix_export_flush( ipfix_t *ifh )
285      _ipfix_freebuf( buf );
286      return ret;
287  }
288 +
289 +int ipfix_export_array( ipfix_t          *ifh,
290 +                        ipfix_template_t *templ,
291 +                        int              nfields,
292 +                        void             **fields,
293 +                        uint16_t         *lengths )
294 +{
295 +    int ret;
296 +
297 +    mod_lock();
298 +    ret = _ipfix_export_array( ifh, templ, nfields, fields, lengths );
299 +    mod_unlock();
300 +
301 +    return ret;
302 +}
303 +
304 +int ipfix_export_flush( ipfix_t *ifh )
305 +{
306 +    int ret;
307 +
308 +    mod_lock();
309 +    ret = _ipfix_export_flush( ifh );
310 +    mod_unlock();
311 +
312 +    return ret;
313 +}
314 +
315 --- a/lib/ipfix.h
316 +++ b/lib/ipfix.h
317 @@ -142,6 +142,12 @@ typedef struct
318      int         nrecords;         /* no. of records in buffer */
319      size_t      offset;           /* output buffer fill level */
320      uint32_t    seqno;            /* sequence no. of next message */
321 +
322 +    /* experimental */
323 +    int        cs_tid;            /* template id of current dataset */
324 +    int        cs_bytes;          /* size of current set */
325 +    uint8_t    *cs_header;        /* start of current set */
326 +
327  } ipfix_t;
328  
329  /** exporter funcs
330 --- a/lib/ipfix_col.c
331 +++ b/lib/ipfix_col.c
332 @@ -907,7 +907,7 @@ int ipfix_decode_datarecord( ipfixt_node
333      return 0;
334  }
335  
336 -static void do_free_datarecord( ipfix_datarecord_t   *data )
337 +void ipfix_free_datarecord( ipfix_datarecord_t   *data )
338  { 
339      if ( data ) {
340          if ( data->addrs )
341 @@ -925,6 +925,7 @@ int ipfix_parse_msg( ipfix_input_t *inpu
342      ipfix_hdr_t          hdr;                  /* ipfix packet header */
343      ipfixs_node_t        *s;
344      ipfix_datarecord_t   data = { NULL, NULL, 0 };
345 +    ipfixe_node_t        *e;
346      uint8_t              *buf;                 /* ipfix payload */
347      uint16_t             setid, setlen;        /* set id, set lenght */
348      int                  i, nread, offset;     /* counter */
349 @@ -1042,6 +1043,12 @@ int ipfix_parse_msg( ipfix_input_t *inpu
350                  err_flag = 1;
351              } 
352              else {
353 +                for ( e=g_exporter; e!=NULL; e=e->next ) {
354 +                    if ( e->elem->export_dset )
355 +                        (void) e->elem->export_dset( t, buf+nread, setlen,
356 +                                                     e->elem->data );
357 +                }
358 +
359                  /** read data records
360                   */
361                  for ( offset=nread, bytesleft=setlen; bytesleft>4; ) {
362 @@ -1076,11 +1083,11 @@ int ipfix_parse_msg( ipfix_input_t *inpu
363          goto errend;
364  
365   end:
366 -    do_free_datarecord( &data );
367 +    ipfix_free_datarecord( &data );
368      return nread;
369  
370   errend:
371 -    do_free_datarecord( &data );
372 +    ipfix_free_datarecord( &data );
373      return -1;
374  }
375  
376 @@ -1093,7 +1100,7 @@ void process_client_tcp( int fd, int mas
377      tcp_conn_t   *tcon = (tcp_conn_t*)data;
378      char         *func = "process_client_tcp";
379  
380 -    mlogf( 3,  "[%s] fd %d mask %d called.\n", func, fd, mask );
381 +    mlogf( 4,  "[%s] fd %d mask %d called.\n", func, fd, mask );
382  
383      /** read ipfix header 
384       */
385 --- a/lib/ipfix_col.h
386 +++ b/lib/ipfix_col.h
387 @@ -88,6 +88,7 @@ typedef struct ipfix_col_info
388      int (*export_newsource)(ipfixs_node_t*,void*);
389      int (*export_newmsg)(ipfixs_node_t*,ipfix_hdr_t*,void*);
390      int (*export_trecord)(ipfixs_node_t*,ipfixt_node_t*,void*);
391 +    int (*export_dset)(ipfixt_node_t*,uint8_t*,size_t,void*);
392      int (*export_drecord)(ipfixs_node_t*,ipfixt_node_t*,
393                            ipfix_datarecord_t*,void*);
394      void (*export_cleanup)(void*);
395 --- a/lib/ipfix_col_files.c
396 +++ b/lib/ipfix_col_files.c
397 @@ -68,7 +68,7 @@ static int export_newsource_file( ipfixs
398              return -1;
399          }
400          snprintf( s->fname+strlen(s->fname), PATH_MAX-strlen(s->fname),
401 -                  "/%u", s->odid );
402 +                  "/%u", (unsigned int)s->odid );
403          if ( (access( s->fname, R_OK ) <0 )
404               && (mkdir( s->fname, S_IRWXU ) <0) ) {
405              mlogf( 0, "[%s] cannot access dir '%s': %s\n",