corosync  3.0.2-dirty
totemudp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2005 MontaVista Software, Inc.
3  * Copyright (c) 2006-2018 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8 
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #include <assert.h>
39 #include <pthread.h>
40 #include <sys/mman.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <sys/socket.h>
44 #include <netdb.h>
45 #include <sys/un.h>
46 #include <sys/ioctl.h>
47 #include <sys/param.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <unistd.h>
51 #include <fcntl.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <errno.h>
55 #include <sched.h>
56 #include <time.h>
57 #include <sys/time.h>
58 #include <sys/poll.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 
62 #include <corosync/sq.h>
63 #include <corosync/swab.h>
64 #include <qb/qbdefs.h>
65 #include <qb/qbloop.h>
66 #define LOGSYS_UTILS_ONLY 1
67 #include <corosync/logsys.h>
68 #include "totemudp.h"
69 
70 #include "util.h"
71 
72 #ifndef MSG_NOSIGNAL
73 #define MSG_NOSIGNAL 0
74 #endif
75 
76 #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX)
77 #define NETIF_STATE_REPORT_UP 1
78 #define NETIF_STATE_REPORT_DOWN 2
79 
80 #define BIND_STATE_UNBOUND 0
81 #define BIND_STATE_REGULAR 1
82 #define BIND_STATE_LOOPBACK 2
83 
85  struct qb_list_head list;
87 };
88 
92  int token;
93  /*
94  * Socket used for local multicast delivery. We don't rely on multicast
95  * loop and rather this UNIX DGRAM socket is used. Socket is created by
96  * socketpair call and they are used in same way as pipe (so [0] is read
97  * end and [1] is write end)
98  */
99  int local_mcast_loop[2];
100 };
101 
104 
106 
108 
110 
111  void *context;
112 
113  void (*totemudp_deliver_fn) (
114  void *context,
115  const void *msg,
116  unsigned int msg_len,
117  const struct sockaddr_storage *system_from);
118 
119  void (*totemudp_iface_change_fn) (
120  void *context,
121  const struct totem_ip_address *iface_address,
122  unsigned int ring_no);
123 
124  void (*totemudp_target_set_completed) (void *context);
125 
126  /*
127  * Function and data used to log messages
128  */
130 
132 
134 
136 
138 
140 
141  void (*totemudp_log_printf) (
142  int level,
143  int subsys,
144  const char *function,
145  const char *file,
146  int line,
147  const char *format,
148  ...)__attribute__((format(printf, 6, 7)));
149 
150  void *udp_context;
151 
152  struct qb_list_head member_list;
153 
154  char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX];
155 
156  char iov_buffer_flush[UDP_RECEIVE_FRAME_SIZE_MAX];
157 
158  struct iovec totemudp_iov_recv;
159 
160  struct iovec totemudp_iov_recv_flush;
161 
162  struct totemudp_socket totemudp_sockets;
163 
164  struct totem_ip_address mcast_address;
165 
167 
169 
171 
173 
175 
176  struct timeval stats_tv_start;
177 
178  struct totem_ip_address my_id;
179 
180  int firstrun;
181 
182  qb_loop_timer_handle timer_netif_check_timeout;
183 
184  unsigned int my_memb_entries;
185 
186  int flushing;
187 
189 
191 
192  struct totem_ip_address token_target;
193 };
194 
195 struct work_item {
196  const void *msg;
197  unsigned int msg_len;
199 };
200 
201 static int totemudp_build_sockets (
202  struct totemudp_instance *instance,
203  struct totem_ip_address *bindnet_address,
204  struct totem_ip_address *mcastaddress,
205  struct totemudp_socket *sockets,
206  struct totem_ip_address *bound_to);
207 
208 static struct totem_ip_address localhost;
209 
210 static void totemudp_instance_initialize (struct totemudp_instance *instance)
211 {
212  memset (instance, 0, sizeof (struct totemudp_instance));
213 
215 
216  instance->totemudp_iov_recv.iov_base = instance->iov_buffer;
217 
218  instance->totemudp_iov_recv.iov_len = UDP_RECEIVE_FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
219  instance->totemudp_iov_recv_flush.iov_base = instance->iov_buffer_flush;
220 
221  instance->totemudp_iov_recv_flush.iov_len = UDP_RECEIVE_FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
222 
223  /*
224  * There is always atleast 1 processor
225  */
226  instance->my_memb_entries = 1;
227 
228  qb_list_init (&instance->member_list);
229 }
230 
231 #define log_printf(level, format, args...) \
232 do { \
233  instance->totemudp_log_printf ( \
234  level, instance->totemudp_subsys_id, \
235  __FUNCTION__, __FILE__, __LINE__, \
236  (const char *)format, ##args); \
237 } while (0);
238 
239 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
240 do { \
241  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
242  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
243  instance->totemudp_log_printf ( \
244  level, instance->totemudp_subsys_id, \
245  __FUNCTION__, __FILE__, __LINE__, \
246  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
247  } while(0)
248 
250  void *udp_context,
251  const char *cipher_type,
252  const char *hash_type)
253 {
254 
255  return (0);
256 }
257 
258 
259 static inline void ucast_sendmsg (
260  struct totemudp_instance *instance,
261  struct totem_ip_address *system_to,
262  const void *msg,
263  unsigned int msg_len)
264 {
265  struct msghdr msg_ucast;
266  int res = 0;
267  struct sockaddr_storage sockaddr;
268  struct iovec iovec;
269  int addrlen;
270 
271  iovec.iov_base = (void*)msg;
272  iovec.iov_len = msg_len;
273 
274  /*
275  * Build unicast message
276  */
277  memset(&msg_ucast, 0, sizeof(msg_ucast));
279  instance->totem_interface->ip_port, &sockaddr, &addrlen);
280  msg_ucast.msg_name = &sockaddr;
281  msg_ucast.msg_namelen = addrlen;
282  msg_ucast.msg_iov = (void *)&iovec;
283  msg_ucast.msg_iovlen = 1;
284 #ifdef HAVE_MSGHDR_CONTROL
285  msg_ucast.msg_control = 0;
286 #endif
287 #ifdef HAVE_MSGHDR_CONTROLLEN
288  msg_ucast.msg_controllen = 0;
289 #endif
290 #ifdef HAVE_MSGHDR_FLAGS
291  msg_ucast.msg_flags = 0;
292 #endif
293 #ifdef HAVE_MSGHDR_ACCRIGHTS
294  msg_ucast.msg_accrights = NULL;
295 #endif
296 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
297  msg_ucast.msg_accrightslen = 0;
298 #endif
299 
300 
301  /*
302  * Transmit unicast message
303  * An error here is recovered by totemsrp
304  */
305  res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_ucast,
306  MSG_NOSIGNAL);
307  if (res < 0) {
308  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
309  "sendmsg(ucast) failed (non-critical)");
310  }
311 }
312 
313 static inline void mcast_sendmsg (
314  struct totemudp_instance *instance,
315  const void *msg,
316  unsigned int msg_len)
317 {
318  struct msghdr msg_mcast;
319  int res = 0;
320  struct iovec iovec;
321  struct sockaddr_storage sockaddr;
322  int addrlen;
323 
324  iovec.iov_base = (void *)msg;
325  iovec.iov_len = msg_len;
326 
327  /*
328  * Build multicast message
329  */
331  instance->totem_interface->ip_port, &sockaddr, &addrlen);
332  memset(&msg_mcast, 0, sizeof(msg_mcast));
333  msg_mcast.msg_name = &sockaddr;
334  msg_mcast.msg_namelen = addrlen;
335  msg_mcast.msg_iov = (void *)&iovec;
336  msg_mcast.msg_iovlen = 1;
337 #ifdef HAVE_MSGHDR_CONTROL
338  msg_mcast.msg_control = 0;
339 #endif
340 #ifdef HAVE_MSGHDR_CONTROLLEN
341  msg_mcast.msg_controllen = 0;
342 #endif
343 #ifdef HAVE_MSGHDR_FLAGS
344  msg_mcast.msg_flags = 0;
345 #endif
346 #ifdef HAVE_MSGHDR_ACCRIGHTS
347  msg_mcast.msg_accrights = NULL;
348 #endif
349 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
350  msg_mcast.msg_accrightslen = 0;
351 #endif
352 
353  /*
354  * Transmit multicast message
355  * An error here is recovered by totemsrp
356  */
357  res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_mcast,
358  MSG_NOSIGNAL);
359  if (res < 0) {
360  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
361  "sendmsg(mcast) failed (non-critical)");
362  instance->stats->continuous_sendmsg_failures++;
363  } else {
364  instance->stats->continuous_sendmsg_failures = 0;
365  }
366 
367  /*
368  * Transmit multicast message to local unix mcast loop
369  * An error here is recovered by totemsrp
370  */
371  msg_mcast.msg_name = NULL;
372  msg_mcast.msg_namelen = 0;
373 
374  res = sendmsg (instance->totemudp_sockets.local_mcast_loop[1], &msg_mcast,
375  MSG_NOSIGNAL);
376  if (res < 0) {
377  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
378  "sendmsg(local mcast loop) failed (non-critical)");
379  }
380 }
381 
382 
384  void *udp_context)
385 {
386  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
387  int res = 0;
388 
389  if (instance->totemudp_sockets.mcast_recv > 0) {
390  qb_loop_poll_del (instance->totemudp_poll_handle,
391  instance->totemudp_sockets.mcast_recv);
392  close (instance->totemudp_sockets.mcast_recv);
393  }
394  if (instance->totemudp_sockets.mcast_send > 0) {
395  close (instance->totemudp_sockets.mcast_send);
396  }
397  if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
398  qb_loop_poll_del (instance->totemudp_poll_handle,
399  instance->totemudp_sockets.local_mcast_loop[0]);
400  close (instance->totemudp_sockets.local_mcast_loop[0]);
401  close (instance->totemudp_sockets.local_mcast_loop[1]);
402  }
403  if (instance->totemudp_sockets.token > 0) {
404  qb_loop_poll_del (instance->totemudp_poll_handle,
405  instance->totemudp_sockets.token);
406  close (instance->totemudp_sockets.token);
407  }
408 
409  return (res);
410 }
411 
412 /*
413  * Only designed to work with a message with one iov
414  */
415 
416 static int net_deliver_fn (
417  int fd,
418  int revents,
419  void *data)
420 {
421  struct totemudp_instance *instance = (struct totemudp_instance *)data;
422  struct msghdr msg_recv;
423  struct iovec *iovec;
424  struct sockaddr_storage system_from;
425  int bytes_received;
426  int truncated_packet;
427 
428  if (instance->flushing == 1) {
429  iovec = &instance->totemudp_iov_recv_flush;
430  } else {
431  iovec = &instance->totemudp_iov_recv;
432  }
433 
434  /*
435  * Receive datagram
436  */
437  msg_recv.msg_name = &system_from;
438  msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
439  msg_recv.msg_iov = iovec;
440  msg_recv.msg_iovlen = 1;
441 #ifdef HAVE_MSGHDR_CONTROL
442  msg_recv.msg_control = 0;
443 #endif
444 #ifdef HAVE_MSGHDR_CONTROLLEN
445  msg_recv.msg_controllen = 0;
446 #endif
447 #ifdef HAVE_MSGHDR_FLAGS
448  msg_recv.msg_flags = 0;
449 #endif
450 #ifdef HAVE_MSGHDR_ACCRIGHTS
451  msg_recv.msg_accrights = NULL;
452 #endif
453 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
454  msg_recv.msg_accrightslen = 0;
455 #endif
456 
457  bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
458  if (bytes_received == -1) {
459  return (0);
460  } else {
461  instance->stats_recv += bytes_received;
462  }
463 
464  truncated_packet = 0;
465 
466 #ifdef HAVE_MSGHDR_FLAGS
467  if (msg_recv.msg_flags & MSG_TRUNC) {
468  truncated_packet = 1;
469  }
470 #else
471  /*
472  * We don't have MSGHDR_FLAGS, but we can (hopefully) safely make assumption that
473  * if bytes_received == UDP_RECIEVE_FRAME_SIZE_MAX then packet is truncated
474  */
475  if (bytes_received == UDP_RECEIVE_FRAME_SIZE_MAX) {
476  truncated_packet = 1;
477  }
478 #endif
479 
480  if (truncated_packet) {
482  "Received too big message. This may be because something bad is happening"
483  "on the network (attack?), or you tried join more nodes than corosync is"
484  "compiled with (%u) or bug in the code (bad estimation of "
485  "the UDP_RECEIVE_FRAME_SIZE_MAX). Dropping packet.", PROCESSOR_COUNT_MAX);
486  return (0);
487  }
488 
489  iovec->iov_len = bytes_received;
490 
491  /*
492  * Handle incoming message
493  */
494  instance->totemudp_deliver_fn (
495  instance->context,
496  iovec->iov_base,
497  iovec->iov_len,
498  &system_from);
499 
500  iovec->iov_len = UDP_RECEIVE_FRAME_SIZE_MAX;
501  return (0);
502 }
503 
504 static int netif_determine (
505  struct totemudp_instance *instance,
506  struct totem_ip_address *bindnet,
507  struct totem_ip_address *bound_to,
508  int *interface_up,
509  int *interface_num)
510 {
511  int res;
512 
513  res = totemip_iface_check (bindnet, bound_to,
514  interface_up, interface_num,
515  instance->totem_config->clear_node_high_bit);
516 
517 
518  return (res);
519 }
520 
521 
522 /*
523  * If the interface is up, the sockets for totem are built. If the interface is down
524  * this function is requeued in the timer list to retry building the sockets later.
525  */
526 static void timer_function_netif_check_timeout (
527  void *data)
528 {
529  struct totemudp_instance *instance = (struct totemudp_instance *)data;
530  int interface_up;
531  int interface_num;
532  struct totem_ip_address *bind_address;
533 
534  /*
535  * Build sockets for every interface
536  */
537  netif_determine (instance,
538  &instance->totem_interface->bindnet,
539  &instance->totem_interface->boundto,
540  &interface_up, &interface_num);
541  /*
542  * If the network interface isn't back up and we are already
543  * in loopback mode, add timer to check again and return
544  */
545  if ((instance->netif_bind_state == BIND_STATE_LOOPBACK &&
546  interface_up == 0) ||
547 
548  (instance->my_memb_entries == 1 &&
549  instance->netif_bind_state == BIND_STATE_REGULAR &&
550  interface_up == 1)) {
551 
552  qb_loop_timer_add (instance->totemudp_poll_handle,
553  QB_LOOP_MED,
554  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
555  (void *)instance,
556  timer_function_netif_check_timeout,
557  &instance->timer_netif_check_timeout);
558 
559  /*
560  * Add a timer to check for a downed regular interface
561  */
562  return;
563  }
564 
565  if (instance->totemudp_sockets.mcast_recv > 0) {
566  qb_loop_poll_del (instance->totemudp_poll_handle,
567  instance->totemudp_sockets.mcast_recv);
568  close (instance->totemudp_sockets.mcast_recv);
569  }
570  if (instance->totemudp_sockets.mcast_send > 0) {
571  close (instance->totemudp_sockets.mcast_send);
572  }
573  if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
574  qb_loop_poll_del (instance->totemudp_poll_handle,
575  instance->totemudp_sockets.local_mcast_loop[0]);
576  close (instance->totemudp_sockets.local_mcast_loop[0]);
577  close (instance->totemudp_sockets.local_mcast_loop[1]);
578  }
579  if (instance->totemudp_sockets.token > 0) {
580  qb_loop_poll_del (instance->totemudp_poll_handle,
581  instance->totemudp_sockets.token);
582  close (instance->totemudp_sockets.token);
583  }
584 
585  if (interface_up == 0) {
586  /*
587  * Interface is not up
588  */
590  bind_address = &localhost;
591 
592  /*
593  * Add a timer to retry building interfaces and request memb_gather_enter
594  */
595  qb_loop_timer_add (instance->totemudp_poll_handle,
596  QB_LOOP_MED,
597  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
598  (void *)instance,
599  timer_function_netif_check_timeout,
600  &instance->timer_netif_check_timeout);
601  } else {
602  /*
603  * Interface is up
604  */
606  bind_address = &instance->totem_interface->bindnet;
607  }
608  /*
609  * Create and bind the multicast and unicast sockets
610  */
611  (void)totemudp_build_sockets (instance,
612  &instance->mcast_address,
613  bind_address,
614  &instance->totemudp_sockets,
615  &instance->totem_interface->boundto);
616 
617  qb_loop_poll_add (
618  instance->totemudp_poll_handle,
619  QB_LOOP_MED,
620  instance->totemudp_sockets.mcast_recv,
621  POLLIN, instance, net_deliver_fn);
622 
623  qb_loop_poll_add (
624  instance->totemudp_poll_handle,
625  QB_LOOP_MED,
626  instance->totemudp_sockets.local_mcast_loop[0],
627  POLLIN, instance, net_deliver_fn);
628 
629  qb_loop_poll_add (
630  instance->totemudp_poll_handle,
631  QB_LOOP_MED,
632  instance->totemudp_sockets.token,
633  POLLIN, instance, net_deliver_fn);
634 
635  totemip_copy (&instance->my_id, &instance->totem_interface->boundto);
636 
637  /*
638  * This reports changes in the interface to the user and totemsrp
639  */
640  if (instance->netif_bind_state == BIND_STATE_REGULAR) {
641  if (instance->netif_state_report & NETIF_STATE_REPORT_UP) {
643  "The network interface [%s] is now up.",
644  totemip_print (&instance->totem_interface->boundto));
646  instance->totemudp_iface_change_fn (instance->context, &instance->my_id, 0);
647  }
648  /*
649  * Add a timer to check for interface going down in single membership
650  */
651  if (instance->my_memb_entries == 1) {
652  qb_loop_timer_add (instance->totemudp_poll_handle,
653  QB_LOOP_MED,
654  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
655  (void *)instance,
656  timer_function_netif_check_timeout,
657  &instance->timer_netif_check_timeout);
658  }
659 
660  } else {
663  "The network interface is down.");
664  instance->totemudp_iface_change_fn (instance->context, &instance->my_id, 0);
665  }
667 
668  }
669 }
670 
671 /* Set the socket priority to INTERACTIVE to ensure
672  that our messages don't get queued behind anything else */
673 static void totemudp_traffic_control_set(struct totemudp_instance *instance, int sock)
674 {
675 #ifdef SO_PRIORITY
676  int prio = 6; /* TC_PRIO_INTERACTIVE */
677 
678  if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) {
679  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set traffic priority");
680  }
681 #endif
682 }
683 
684 static int totemudp_build_sockets_ip (
685  struct totemudp_instance *instance,
686  struct totem_ip_address *mcast_address,
687  struct totem_ip_address *bindnet_address,
688  struct totemudp_socket *sockets,
689  struct totem_ip_address *bound_to,
690  int interface_num)
691 {
692  struct sockaddr_storage sockaddr;
693  struct ipv6_mreq mreq6;
694  struct ip_mreq mreq;
695  struct sockaddr_storage mcast_ss, boundto_ss;
696  struct sockaddr_in6 *mcast_sin6 = (struct sockaddr_in6 *)&mcast_ss;
697  struct sockaddr_in *mcast_sin = (struct sockaddr_in *)&mcast_ss;
698  struct sockaddr_in *boundto_sin = (struct sockaddr_in *)&boundto_ss;
699  unsigned int sendbuf_size;
700  unsigned int recvbuf_size;
701  unsigned int optlen = sizeof (sendbuf_size);
702  unsigned int retries;
703  int addrlen;
704  int res;
705  int flag;
706  uint8_t sflag;
707  int i;
708 
709  /*
710  * Create multicast recv socket
711  */
712  sockets->mcast_recv = socket (bindnet_address->family, SOCK_DGRAM, 0);
713  if (sockets->mcast_recv == -1) {
714  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
715  "socket() failed");
716  return (-1);
717  }
718 
719  totemip_nosigpipe (sockets->mcast_recv);
720  res = fcntl (sockets->mcast_recv, F_SETFL, O_NONBLOCK);
721  if (res == -1) {
722  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
723  "Could not set non-blocking operation on multicast socket");
724  return (-1);
725  }
726 
727  /*
728  * Force reuse
729  */
730  flag = 1;
731  if ( setsockopt(sockets->mcast_recv, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) {
732  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
733  "setsockopt(SO_REUSEADDR) failed");
734  return (-1);
735  }
736 
737  /*
738  * Create local multicast loop socket
739  */
740  if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets->local_mcast_loop) == -1) {
741  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
742  "socket() failed");
743  return (-1);
744  }
745 
746  for (i = 0; i < 2; i++) {
747  totemip_nosigpipe (sockets->local_mcast_loop[i]);
748  res = fcntl (sockets->local_mcast_loop[i], F_SETFL, O_NONBLOCK);
749  if (res == -1) {
750  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
751  "Could not set non-blocking operation on multicast socket");
752  return (-1);
753  }
754  }
755 
756 
757 
758  /*
759  * Setup mcast send socket
760  */
761  sockets->mcast_send = socket (bindnet_address->family, SOCK_DGRAM, 0);
762  if (sockets->mcast_send == -1) {
763  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
764  "socket() failed");
765  return (-1);
766  }
767 
768  totemip_nosigpipe (sockets->mcast_send);
769  res = fcntl (sockets->mcast_send, F_SETFL, O_NONBLOCK);
770  if (res == -1) {
771  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
772  "Could not set non-blocking operation on multicast socket");
773  return (-1);
774  }
775 
776  /*
777  * Force reuse
778  */
779  flag = 1;
780  if ( setsockopt(sockets->mcast_send, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) {
781  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
782  "setsockopt(SO_REUSEADDR) failed");
783  return (-1);
784  }
785 
787  &sockaddr, &addrlen);
788 
789  retries = 0;
790  while (1) {
791  res = bind (sockets->mcast_send, (struct sockaddr *)&sockaddr, addrlen);
792  if (res == 0) {
793  break;
794  }
795  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
796  "Unable to bind the socket to send multicast packets");
797  if (++retries > BIND_MAX_RETRIES) {
798  break;
799  }
800 
801  /*
802  * Wait for a while
803  */
804  (void)poll(NULL, 0, BIND_RETRIES_INTERVAL * retries);
805  }
806  if (res == -1) {
807  return (-1);
808  }
809 
810  /*
811  * Setup unicast socket
812  */
813  sockets->token = socket (bindnet_address->family, SOCK_DGRAM, 0);
814  if (sockets->token == -1) {
815  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
816  "socket() failed");
817  return (-1);
818  }
819 
820  totemip_nosigpipe (sockets->token);
821  res = fcntl (sockets->token, F_SETFL, O_NONBLOCK);
822  if (res == -1) {
823  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
824  "Could not set non-blocking operation on token socket");
825  return (-1);
826  }
827 
828  /*
829  * Force reuse
830  */
831  flag = 1;
832  if ( setsockopt(sockets->token, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) {
833  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
834  "setsockopt(SO_REUSEADDR) failed");
835  return (-1);
836  }
837 
838  /*
839  * Bind to unicast socket used for token send/receives
840  * This has the side effect of binding to the correct interface
841  */
842  totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen);
843 
844  retries = 0;
845  while (1) {
846  res = bind (sockets->token, (struct sockaddr *)&sockaddr, addrlen);
847  if (res == 0) {
848  break;
849  }
850  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
851  "Unable to bind UDP unicast socket");
852  if (++retries > BIND_MAX_RETRIES) {
853  break;
854  }
855 
856  /*
857  * Wait for a while
858  */
859  (void)poll(NULL, 0, BIND_RETRIES_INTERVAL * retries);
860  }
861  if (res == -1) {
862  return (-1);
863  }
864 
865  recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
866  sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
867  /*
868  * Set buffer sizes to avoid overruns
869  */
870  res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
871  if (res == -1) {
872  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
873  "Unable to set SO_RCVBUF size on UDP mcast socket");
874  return (-1);
875  }
876  res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
877  if (res == -1) {
878  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
879  "Unable to set SO_SNDBUF size on UDP mcast socket");
880  return (-1);
881  }
882  res = setsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
883  if (res == -1) {
884  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
885  "Unable to set SO_RCVBUF size on UDP local mcast loop socket");
886  return (-1);
887  }
888  res = setsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
889  if (res == -1) {
890  LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
891  "Unable to set SO_SNDBUF size on UDP local mcast loop socket");
892  return (-1);
893  }
894 
895  res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
896  if (res == 0) {
898  "Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
899  }
900 
901  res = getsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
902  if (res == 0) {
904  "Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
905  }
906 
907  res = getsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
908  if (res == 0) {
910  "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
911  }
912 
913  res = getsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
914  if (res == 0) {
916  "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
917  }
918 
919 
920  /*
921  * Join group membership on socket
922  */
923  totemip_totemip_to_sockaddr_convert(mcast_address, instance->totem_interface->ip_port, &mcast_ss, &addrlen);
924  totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &boundto_ss, &addrlen);
925 
926  if (instance->totem_config->broadcast_use == 1) {
927  unsigned int broadcast = 1;
928 
929  if ((setsockopt(sockets->mcast_recv, SOL_SOCKET,
930  SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
931  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
932  "setting broadcast option failed");
933  return (-1);
934  }
935  if ((setsockopt(sockets->mcast_send, SOL_SOCKET,
936  SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
937  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
938  "setting broadcast option failed");
939  return (-1);
940  }
941  } else {
942  switch (bindnet_address->family) {
943  case AF_INET:
944  memset(&mreq, 0, sizeof(mreq));
945  mreq.imr_multiaddr.s_addr = mcast_sin->sin_addr.s_addr;
946  mreq.imr_interface.s_addr = boundto_sin->sin_addr.s_addr;
947  res = setsockopt (sockets->mcast_recv, IPPROTO_IP, IP_ADD_MEMBERSHIP,
948  &mreq, sizeof (mreq));
949  if (res == -1) {
950  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
951  "join ipv4 multicast group failed");
952  return (-1);
953  }
954  break;
955  case AF_INET6:
956  memset(&mreq6, 0, sizeof(mreq6));
957  memcpy(&mreq6.ipv6mr_multiaddr, &mcast_sin6->sin6_addr, sizeof(struct in6_addr));
958  mreq6.ipv6mr_interface = interface_num;
959 
960  res = setsockopt (sockets->mcast_recv, IPPROTO_IPV6, IPV6_JOIN_GROUP,
961  &mreq6, sizeof (mreq6));
962  if (res == -1) {
963  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
964  "join ipv6 multicast group failed");
965  return (-1);
966  }
967  break;
968  }
969  }
970 
971  /*
972  * Turn off multicast loopback
973  */
974 
975  flag = 0;
976  switch ( bindnet_address->family ) {
977  case AF_INET:
978  sflag = 0;
979  res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
980  &sflag, sizeof (sflag));
981  break;
982  case AF_INET6:
983  res = setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
984  &flag, sizeof (flag));
985  }
986  if (res == -1) {
987  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
988  "Unable to turn off multicast loopback");
989  return (-1);
990  }
991 
992  /*
993  * Set multicast packets TTL
994  */
995  flag = instance->totem_interface->ttl;
996  if (bindnet_address->family == AF_INET6) {
997  res = setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
998  &flag, sizeof (flag));
999  if (res == -1) {
1000  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1001  "set mcast v6 TTL failed");
1002  return (-1);
1003  }
1004  } else {
1005  sflag = flag;
1006  res = setsockopt(sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_TTL,
1007  &sflag, sizeof(sflag));
1008  if (res == -1) {
1009  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1010  "set mcast v4 TTL failed");
1011  return (-1);
1012  }
1013  }
1014 
1015  /*
1016  * Bind to a specific interface for multicast send and receive
1017  */
1018  switch ( bindnet_address->family ) {
1019  case AF_INET:
1020  if (setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_IF,
1021  &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
1022  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1023  "cannot select interface for multicast packets (send)");
1024  return (-1);
1025  }
1026  if (setsockopt (sockets->mcast_recv, IPPROTO_IP, IP_MULTICAST_IF,
1027  &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
1028  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1029  "cannot select interface for multicast packets (recv)");
1030  return (-1);
1031  }
1032  break;
1033  case AF_INET6:
1034  if (setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
1035  &interface_num, sizeof (interface_num)) < 0) {
1036  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1037  "cannot select interface for multicast packets (send v6)");
1038  return (-1);
1039  }
1040  if (setsockopt (sockets->mcast_recv, IPPROTO_IPV6, IPV6_MULTICAST_IF,
1041  &interface_num, sizeof (interface_num)) < 0) {
1042  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1043  "cannot select interface for multicast packets (recv v6)");
1044  return (-1);
1045  }
1046  break;
1047  }
1048 
1049  /*
1050  * Bind to multicast socket used for multicast receives
1051  * This needs to happen after all of the multicast setsockopt() calls
1052  * as the kernel seems to only put them into effect (for IPV6) when bind()
1053  * is called.
1054  */
1056  instance->totem_interface->ip_port, &sockaddr, &addrlen);
1057 
1058  retries = 0;
1059  while (1) {
1060  res = bind (sockets->mcast_recv, (struct sockaddr *)&sockaddr, addrlen);
1061  if (res == 0) {
1062  break;
1063  }
1064  LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
1065  "Unable to bind the socket to receive multicast packets");
1066  if (++retries > BIND_MAX_RETRIES) {
1067  break;
1068  }
1069 
1070  /*
1071  * Wait for a while
1072  */
1073  (void)poll(NULL, 0, BIND_RETRIES_INTERVAL * retries);
1074  }
1075 
1076  if (res == -1) {
1077  return (-1);
1078  }
1079  return 0;
1080 }
1081 
1082 static int totemudp_build_sockets (
1083  struct totemudp_instance *instance,
1084  struct totem_ip_address *mcast_address,
1085  struct totem_ip_address *bindnet_address,
1086  struct totemudp_socket *sockets,
1087  struct totem_ip_address *bound_to)
1088 {
1089  int interface_num;
1090  int interface_up;
1091  int res;
1092 
1093  /*
1094  * Determine the ip address bound to and the interface name
1095  */
1096  res = netif_determine (instance,
1097  bindnet_address,
1098  bound_to,
1099  &interface_up,
1100  &interface_num);
1101 
1102  if (res == -1) {
1103  return (-1);
1104  }
1105 
1106  totemip_copy(&instance->my_id, bound_to);
1107 
1108  res = totemudp_build_sockets_ip (instance, mcast_address,
1109  bindnet_address, sockets, bound_to, interface_num);
1110 
1111  if (res == -1) {
1112  /* if we get here, corosync won't work anyway, so better leaving than faking to work */
1113  LOGSYS_PERROR (errno, instance->totemudp_log_level_error,
1114  "Unable to create sockets, exiting");
1115  exit(EXIT_FAILURE);
1116  }
1117 
1118  /* We only send out of the token socket */
1119  totemudp_traffic_control_set(instance, sockets->token);
1120  return res;
1121 }
1122 
1123 /*
1124  * Totem Network interface
1125  * depends on poll abstraction, POSIX, IPV4
1126  */
1127 
1128 /*
1129  * Create an instance
1130  */
1132  qb_loop_t *poll_handle,
1133  void **udp_context,
1134  struct totem_config *totem_config,
1135  totemsrp_stats_t *stats,
1136 
1137  void *context,
1138 
1139  void (*deliver_fn) (
1140  void *context,
1141  const void *msg,
1142  unsigned int msg_len,
1143  const struct sockaddr_storage *system_from),
1144 
1145  void (*iface_change_fn) (
1146  void *context,
1147  const struct totem_ip_address *iface_address,
1148  unsigned int ring_no),
1149 
1150  void (*mtu_changed) (
1151  void *context,
1152  int net_mtu),
1153 
1154  void (*target_set_completed) (
1155  void *context))
1156 {
1157  struct totemudp_instance *instance;
1158 
1159  instance = malloc (sizeof (struct totemudp_instance));
1160  if (instance == NULL) {
1161  return (-1);
1162  }
1163 
1164  totemudp_instance_initialize (instance);
1165 
1166  instance->totem_config = totem_config;
1167  instance->stats = stats;
1168 
1169  /*
1170  * Configure logging
1171  */
1172  instance->totemudp_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security;
1179 
1180  /*
1181  * Initialize local variables for totemudp
1182  */
1183  instance->totem_interface = &totem_config->interfaces[0];
1184  totemip_copy (&instance->mcast_address, &instance->totem_interface->mcast_addr);
1185  memset (instance->iov_buffer, 0, UDP_RECEIVE_FRAME_SIZE_MAX);
1186 
1187  instance->totemudp_poll_handle = poll_handle;
1188 
1189  instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
1190 
1191  instance->context = context;
1192  instance->totemudp_deliver_fn = deliver_fn;
1193 
1194  instance->totemudp_iface_change_fn = iface_change_fn;
1195 
1196  instance->totemudp_target_set_completed = target_set_completed;
1197 
1198  totemip_localhost (instance->mcast_address.family, &localhost);
1199  localhost.nodeid = instance->totem_config->node_id;
1200 
1201  /*
1202  * RRP layer isn't ready to receive message because it hasn't
1203  * initialized yet. Add short timer to check the interfaces.
1204  */
1205  qb_loop_timer_add (instance->totemudp_poll_handle,
1206  QB_LOOP_MED,
1207  100*QB_TIME_NS_IN_MSEC,
1208  (void *)instance,
1209  timer_function_netif_check_timeout,
1210  &instance->timer_netif_check_timeout);
1211 
1212  *udp_context = instance;
1213  return (0);
1214 }
1215 
1217 {
1218  return malloc (FRAME_SIZE_MAX);
1219 }
1220 
1221 void totemudp_buffer_release (void *ptr)
1222 {
1223  return free (ptr);
1224 }
1225 
1227  void *udp_context,
1228  int processor_count)
1229 {
1230  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1231  int res = 0;
1232 
1233  instance->my_memb_entries = processor_count;
1234  qb_loop_timer_del (instance->totemudp_poll_handle,
1235  instance->timer_netif_check_timeout);
1236  if (processor_count == 1) {
1237  qb_loop_timer_add (instance->totemudp_poll_handle,
1238  QB_LOOP_MED,
1239  instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
1240  (void *)instance,
1241  timer_function_netif_check_timeout,
1242  &instance->timer_netif_check_timeout);
1243  }
1244 
1245  return (res);
1246 }
1247 
1248 int totemudp_recv_flush (void *udp_context)
1249 {
1250  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1251  struct pollfd ufd;
1252  int nfds;
1253  int res = 0;
1254  int i;
1255  int sock;
1256 
1257  instance->flushing = 1;
1258 
1259  for (i = 0; i < 2; i++) {
1260  sock = -1;
1261  if (i == 0) {
1262  sock = instance->totemudp_sockets.mcast_recv;
1263  }
1264  if (i == 1) {
1265  sock = instance->totemudp_sockets.local_mcast_loop[0];
1266  }
1267  assert(sock != -1);
1268 
1269  do {
1270  ufd.fd = sock;
1271  ufd.events = POLLIN;
1272  nfds = poll (&ufd, 1, 0);
1273  if (nfds == 1 && ufd.revents & POLLIN) {
1274  net_deliver_fn (sock, ufd.revents, instance);
1275  }
1276  } while (nfds == 1);
1277  }
1278 
1279  instance->flushing = 0;
1280 
1281  return (res);
1282 }
1283 
1284 int totemudp_send_flush (void *udp_context)
1285 {
1286  return 0;
1287 }
1288 
1290  void *udp_context,
1291  const void *msg,
1292  unsigned int msg_len)
1293 {
1294  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1295  int res = 0;
1296 
1297  ucast_sendmsg (instance, &instance->token_target, msg, msg_len);
1298 
1299  return (res);
1300 }
1302  void *udp_context,
1303  const void *msg,
1304  unsigned int msg_len)
1305 {
1306  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1307  int res = 0;
1308 
1309  mcast_sendmsg (instance, msg, msg_len);
1310 
1311  return (res);
1312 }
1313 
1315  void *udp_context,
1316  const void *msg,
1317  unsigned int msg_len)
1318 {
1319  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1320  int res = 0;
1321 
1322  mcast_sendmsg (instance, msg, msg_len);
1323 
1324  return (res);
1325 }
1326 
1327 extern int totemudp_iface_check (void *udp_context)
1328 {
1329  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1330  int res = 0;
1331 
1332  timer_function_netif_check_timeout (instance);
1333 
1334  return (res);
1335 }
1336 
1338  void *net_context,
1339  char ***status,
1340  unsigned int *iface_count)
1341 {
1342  static char *statuses[INTERFACE_MAX] = {(char*)"OK"};
1343 
1344  if (status) {
1345  *status = statuses;
1346  }
1347  *iface_count = 1;
1348 
1349  return (0);
1350 }
1351 
1352 extern void totemudp_net_mtu_adjust (void *udp_context, struct totem_config *totem_config)
1353 {
1354  totem_config->net_mtu -= totemip_udpip_header_size(totem_config->interfaces[0].bindnet.family);
1355 }
1356 
1358  void *udp_context,
1359  unsigned int nodeid)
1360 {
1361  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1362  struct qb_list_head *list;
1363  struct totemudp_member *member;
1364  int res = 0;
1365 
1366  qb_list_for_each(list, &(instance->member_list)) {
1367  member = qb_list_entry (list,
1368  struct totemudp_member,
1369  list);
1370 
1371  if (member->member.nodeid == nodeid) {
1372  memcpy (&instance->token_target, &member->member,
1373  sizeof (struct totem_ip_address));
1374 
1375  instance->totemudp_target_set_completed (instance->context);
1376  break;
1377  }
1378  }
1379  return (res);
1380 }
1381 
1383  void *udp_context)
1384 {
1385  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1386  unsigned int res;
1387  struct sockaddr_storage system_from;
1388  struct msghdr msg_recv;
1389  struct pollfd ufd;
1390  int nfds;
1391  int msg_processed = 0;
1392  int i;
1393  int sock;
1394 
1395  /*
1396  * Receive datagram
1397  */
1398  msg_recv.msg_name = &system_from;
1399  msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
1400  msg_recv.msg_iov = &instance->totemudp_iov_recv_flush;
1401  msg_recv.msg_iovlen = 1;
1402 #ifdef HAVE_MSGHDR_CONTROL
1403  msg_recv.msg_control = 0;
1404 #endif
1405 #ifdef HAVE_MSGHDR_CONTROLLEN
1406  msg_recv.msg_controllen = 0;
1407 #endif
1408 #ifdef HAVE_MSGHDR_FLAGS
1409  msg_recv.msg_flags = 0;
1410 #endif
1411 #ifdef HAVE_MSGHDR_ACCRIGHTS
1412  msg_recv.msg_accrights = NULL;
1413 #endif
1414 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN
1415  msg_recv.msg_accrightslen = 0;
1416 #endif
1417 
1418  for (i = 0; i < 2; i++) {
1419  sock = -1;
1420  if (i == 0) {
1421  sock = instance->totemudp_sockets.mcast_recv;
1422  }
1423  if (i == 1) {
1424  sock = instance->totemudp_sockets.local_mcast_loop[0];
1425  }
1426  assert(sock != -1);
1427 
1428  do {
1429  ufd.fd = sock;
1430  ufd.events = POLLIN;
1431  nfds = poll (&ufd, 1, 0);
1432  if (nfds == 1 && ufd.revents & POLLIN) {
1433  res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
1434  if (res != -1) {
1435  msg_processed = 1;
1436  } else {
1437  msg_processed = -1;
1438  }
1439  }
1440  } while (nfds == 1);
1441  }
1442 
1443  return (msg_processed);
1444 }
1445 
1446 
1448  void *udp_context,
1449  const struct totem_ip_address *local,
1450  const struct totem_ip_address *member,
1451  int ring_no)
1452 {
1453  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1454 
1455  struct totemudp_member *new_member;
1456 
1457  new_member = malloc (sizeof (struct totemudp_member));
1458  if (new_member == NULL) {
1459  return (-1);
1460  }
1461 
1462  memset(new_member, 0, sizeof(*new_member));
1463 
1464  qb_list_init (&new_member->list);
1465  qb_list_add_tail (&new_member->list, &instance->member_list);
1466  memcpy (&new_member->member, member, sizeof (struct totem_ip_address));
1467 
1468  return (0);
1469 }
1470 
1472  void *udp_context,
1473  const struct totem_ip_address *token_target,
1474  int ring_no)
1475 {
1476  int found = 0;
1477  struct qb_list_head *list;
1478  struct totemudp_member *member;
1479  struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
1480 
1481  /*
1482  * Find the member to remove and close its socket
1483  */
1484  qb_list_for_each(list, &(instance->member_list)) {
1485  member = qb_list_entry (list,
1486  struct totemudp_member,
1487  list);
1488 
1489  if (totemip_compare (token_target, &member->member)==0) {
1490  found = 1;
1491  break;
1492  }
1493  }
1494 
1495  /*
1496  * Delete the member from the list
1497  */
1498  if (found) {
1499  qb_list_del (list);
1500  }
1501 
1502  return (0);
1503 }
1504 
1505 int totemudp_iface_set (void *net_context,
1506  const struct totem_ip_address *local_addr,
1507  unsigned short ip_port,
1508  unsigned int iface_no)
1509 {
1510  /* Not supported */
1511  return (-1);
1512 }
1513 
1515  void *udp_context,
1516  struct totem_config *totem_config)
1517 {
1518  /* Not supported */
1519  return (-1);
1520 }
unsigned int clear_node_high_bit
Definition: totem.h:161
unsigned short family
Definition: coroapi.h:113
int totemudp_crypto_set(void *udp_context, const char *cipher_type, const char *hash_type)
Definition: totemudp.c:249
int totemudp_processor_count_set(void *udp_context, int processor_count)
Definition: totemudp.c:1226
int totemip_localhost(int family, struct totem_ip_address *localhost)
Definition: totemip.c:210
char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX]
Definition: totemudp.c:154
int totemudp_iface_set(void *net_context, const struct totem_ip_address *local_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemudp.c:1505
#define BIND_MAX_RETRIES
Definition: totem.h:69
struct totem_ip_address member
Definition: totemudp.c:86
int totemudp_subsys_id
Definition: totemudp.c:139
struct iovec totemudp_iov_recv_flush
Definition: totemudp.c:160
struct totem_interface * interfaces
Definition: totem.h:158
totemsrp_stats_t * stats
Definition: totemudp.c:190
struct totemudp_instance * instance
Definition: totemudp.c:198
The totem_ip_address struct.
Definition: coroapi.h:111
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:264
void(* totemudp_iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no)
Definition: totemudp.c:119
void totemudp_net_mtu_adjust(void *udp_context, struct totem_config *totem_config)
Definition: totemudp.c:1352
int totemudp_token_target_set(void *udp_context, unsigned int nodeid)
Definition: totemudp.c:1357
void * totemudp_buffer_alloc(void)
Definition: totemudp.c:1216
int totemip_compare(const void *a, const void *b)
Definition: totemip.c:158
int totemudp_mcast_flush_send(void *udp_context, const void *msg, unsigned int msg_len)
Definition: totemudp.c:1301
#define MSG_NOSIGNAL
Definition: totemudp.c:73
struct totemudp_socket totemudp_sockets
Definition: totemudp.c:162
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:123
struct totem_ip_address mcast_address
Definition: totemudp.c:164
unsigned int downcheck_timeout
Definition: totem.h:192
int totemudp_mcast_noflush_send(void *udp_context, const void *msg, unsigned int msg_len)
Definition: totemudp.c:1314
int totemudp_log_level_security
Definition: totemudp.c:129
#define BIND_STATE_REGULAR
Definition: totemudp.c:81
struct totem_config * totem_config
Definition: totemudp.c:188
#define NETIF_STATE_REPORT_DOWN
Definition: totemudp.c:78
#define totemip_nosigpipe(s)
Definition: totemip.h:56
struct qb_list_head member_list
Definition: totemudp.c:152
int totemudp_log_level_error
Definition: totemudp.c:131
#define INTERFACE_MAX
Definition: coroapi.h:88
int totemudp_send_flush(void *udp_context)
Definition: totemudp.c:1284
uint16_t ttl
Definition: totem.h:86
int totemudp_recv_flush(void *udp_context)
Definition: totemudp.c:1248
unsigned int node_id
Definition: totem.h:160
struct iovec totemudp_iov_recv
Definition: totemudp.c:158
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
Definition: totemip.c:537
#define UDP_RECEIVE_FRAME_SIZE_MAX
Definition: totem.h:61
char iov_buffer_flush[UDP_RECEIVE_FRAME_SIZE_MAX]
Definition: totemudp.c:156
int totemudp_token_send(void *udp_context, const void *msg, unsigned int msg_len)
Definition: totemudp.c:1289
int totemudp_member_remove(void *udp_context, const struct totem_ip_address *token_target, int ring_no)
Definition: totemudp.c:1471
struct totem_interface * totem_interface
Definition: totemudp.c:105
unsigned int nodeid
Definition: coroapi.h:112
qb_loop_t * totemudp_poll_handle
Definition: totemudp.c:103
int netif_state_report
Definition: totemudp.c:107
unsigned int my_memb_entries
Definition: totemudp.c:184
struct totem_ip_address mcast_addr
Definition: totem.h:84
#define BIND_RETRIES_INTERVAL
Definition: totem.h:70
void(* totemudp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemudp.c:141
int totemudp_finalize(void *udp_context)
Definition: totemudp.c:383
struct totem_ip_address boundto
Definition: totem.h:83
#define BIND_STATE_LOOPBACK
Definition: totemudp.c:82
typedef __attribute__
size_t totemip_udpip_header_size(int family)
Definition: totemip.c:616
#define NETIF_STATE_REPORT_UP
Definition: totemudp.c:77
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
uint16_t ip_port
Definition: totem.h:85
#define MCAST_SOCKET_BUFFER_SIZE
Definition: totemudp.c:76
void(*) void udp_context)
Definition: totemudp.c:148
unsigned int net_mtu
Definition: totem.h:202
int totemudp_log_level_debug
Definition: totemudp.c:137
struct qb_list_head list
Definition: totemudp.c:85
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
int totemudp_log_level_notice
Definition: totemudp.c:135
int totemudp_reconfigure(void *udp_context, struct totem_config *totem_config)
Definition: totemudp.c:1514
unsigned int broadcast_use
Definition: totem.h:216
int totemudp_member_add(void *udp_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemudp.c:1447
#define FRAME_SIZE_MAX
Definition: totem.h:52
void(* totemudp_target_set_completed)(void *context)
Definition: totemudp.c:124
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
Definition: totemip.c:272
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200
#define LOGSYS_PERROR(err_num, level, fmt, args...)
Definition: totemudp.c:239
struct srp_addr system_from
Definition: totemsrp.c:261
#define log_printf(level, format, args...)
Definition: totemudp.c:231
struct totem_ip_address my_id
Definition: totemudp.c:178
int totemudp_recv_mcast_empty(void *udp_context)
Definition: totemudp.c:1382
int totemudp_iface_check(void *udp_context)
Definition: totemudp.c:1327
int totemudp_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemudp.c:1337
struct totem_ip_address bindnet
Definition: totem.h:82
unsigned int nodeid
Definition: coroapi.h:75
void(* totemudp_deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemudp.c:113
uint32_t continuous_sendmsg_failures
Definition: totemstats.h:79
int totemudp_initialize(qb_loop_t *poll_handle, void **udp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
Definition: totemudp.c:1131
int local_mcast_loop[2]
Definition: totemudp.c:99
struct totem_ip_address token_target
Definition: totemudp.c:192
qb_loop_timer_handle timer_netif_check_timeout
Definition: totemudp.c:182
void totemudp_buffer_release(void *ptr)
Definition: totemudp.c:1221
int totemudp_log_level_warning
Definition: totemudp.c:133