corosync  2.4.2-dirty
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2009 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - encryption of message contents with nss
45  * - authentication of meessage contents with SHA1/HMAC
46  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
47  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48  */
49 
50 #include <config.h>
51 
52 #include <assert.h>
53 #ifdef HAVE_ALLOCA_H
54 #include <alloca.h>
55 #endif
56 #include <sys/mman.h>
57 #include <sys/types.h>
58 #include <sys/stat.h>
59 #include <sys/socket.h>
60 #include <netdb.h>
61 #include <sys/un.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #include <unistd.h>
67 #include <fcntl.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <errno.h>
71 #include <sched.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <sys/poll.h>
75 #include <sys/uio.h>
76 #include <limits.h>
77 
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
81 
82 #include <corosync/swab.h>
83 #include <corosync/sq.h>
84 #include <corosync/list.h>
85 
86 #define LOGSYS_UTILS_ONLY 1
87 #include <corosync/logsys.h>
88 
89 #include "totemsrp.h"
90 #include "totemrrp.h"
91 #include "totemnet.h"
92 
93 #include "cs_queue.h"
94 
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99 #define MAXIOVS 5
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000 /* bytes */
102 #define LEAVE_DUMMY_NODEID 0
103 
104 /*
105  * Rollover handling:
106  * SEQNO_START_MSG is the starting sequence number after a new configuration
107  * This should remain zero, unless testing overflow in which case
108  * 0x7ffff000 and 0xfffff000 are good starting values.
109  *
110  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
111  * for a token. This should remain zero, unless testing overflow in which
112  * case 07fffff00 or 0xffffff00 are good starting values.
113  */
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
116 
117 /*
118  * These can be used ot test different rollover points
119  * #define SEQNO_START_MSG 0xfffffe00
120  * #define SEQNO_START_TOKEN 0xfffffe00
121  */
122 
123 /*
124  * These can be used to test the error recovery algorithms
125  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
126  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
127  * #define TEST_DROP_MCAST_PERCENTAGE 50
128  * #define TEST_RECOVERY_MSG_COUNT 300
129  */
130 
131 /*
132  * we compare incoming messages to determine if their endian is
133  * different - if so convert them
134  *
135  * do not change
136  */
137 #define ENDIAN_LOCAL 0xff22
138 
140  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
141  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
142  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
143  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
144  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
145  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
146 };
147 
151 };
152 
153 /*
154  * New membership algorithm local variables
155  */
157  struct srp_addr addr;
158  int set;
159 };
160 
161 
163  struct list_head list;
164  int (*callback_fn) (enum totem_callback_token_type type, const void *);
165  enum totem_callback_token_type callback_type;
166  int delete;
167  void *data;
168 };
169 
170 
172  int mcast;
173  int token;
174 };
175 
176 struct message_header {
177  char type;
178  char encapsulated;
179  unsigned short endian_detector;
180  unsigned int nodeid;
181 } __attribute__((packed));
182 
183 
184 struct mcast {
187  unsigned int seq;
190  unsigned int node_id;
192 } __attribute__((packed));
193 
194 
195 struct rtr_item {
197  unsigned int seq;
198 }__attribute__((packed));
199 
200 
201 struct orf_token {
203  unsigned int seq;
204  unsigned int token_seq;
205  unsigned int aru;
206  unsigned int aru_addr;
208  unsigned int backlog;
209  unsigned int fcc;
212  struct rtr_item rtr_list[0];
213 }__attribute__((packed));
214 
215 
216 struct memb_join {
219  unsigned int proc_list_entries;
220  unsigned int failed_list_entries;
221  unsigned long long ring_seq;
222  unsigned char end_of_memb_join[0];
223 /*
224  * These parts of the data structure are dynamic:
225  * struct srp_addr proc_list[];
226  * struct srp_addr failed_list[];
227  */
228 } __attribute__((packed));
229 
230 
235 } __attribute__((packed));
236 
237 
241 } __attribute__((packed));
242 
243 
246  unsigned int aru;
247  unsigned int high_delivered;
248  unsigned int received_flg;
249 }__attribute__((packed));
250 
251 
254  unsigned int token_seq;
256  unsigned int retrans_flg;
259  unsigned char end_of_commit_token[0];
260 /*
261  * These parts of the data structure are dynamic:
262  *
263  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
264  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
265  */
266 }__attribute__((packed));
267 
268 struct message_item {
269  struct mcast *mcast;
270  unsigned int msg_len;
271 };
272 
274  struct mcast *mcast;
275  unsigned int msg_len;
276 };
277 
283 };
284 
287 
289 
290  /*
291  * Flow control mcasts and remcasts on last and current orf_token
292  */
294 
296 
298 
299  struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
300 
302 
303  struct srp_addr my_id;
304 
305  struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
306 
307  struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
308 
309  struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
310 
311  struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
312 
313  struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
314 
315  struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
316 
317  struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
318 
319  unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX];
320 
322 
324 
326 
328 
330 
332 
334 
336 
337  struct memb_ring_id my_ring_id;
338 
339  struct memb_ring_id my_old_ring_id;
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
368  struct cs_queue new_message_queue;
369 
370  struct cs_queue new_message_queue_trans;
371 
372  struct cs_queue retrans_message_queue;
373 
374  struct sq regular_sort_queue;
375 
376  struct sq recovery_sort_queue;
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
385  struct list_head token_callback_received_listhead;
386 
387  struct list_head token_callback_sent_listhead;
388 
389  char orf_token_retransmit[TOKEN_SIZE_MAX];
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
403 
405 
406  qb_loop_timer_handle timer_merge_detect_timeout;
407 
409 
411 
412  qb_loop_timer_handle memb_timer_state_commit_timeout;
413 
414  qb_loop_timer_handle timer_heartbeat_timeout;
415 
416  /*
417  * Function and data used to log messages
418  */
420 
422 
424 
426 
428 
430 
432 
433  void (*totemsrp_log_printf) (
434  int level,
435  int sybsys,
436  const char *function,
437  const char *file,
438  int line,
439  const char *format, ...)__attribute__((format(printf, 6, 7)));;
440 
442 
443 //TODO struct srp_addr next_memb;
444 
446 
447  struct totem_ip_address mcast_address;
448 
449  void (*totemsrp_deliver_fn) (
450  unsigned int nodeid,
451  const void *msg,
452  unsigned int msg_len,
453  int endian_conversion_required);
454 
455  void (*totemsrp_confchg_fn) (
456  enum totem_configuration_type configuration_type,
457  const unsigned int *member_list, size_t member_list_entries,
458  const unsigned int *left_list, size_t left_list_entries,
459  const unsigned int *joined_list, size_t joined_list_entries,
460  const struct memb_ring_id *ring_id);
461 
462  void (*totemsrp_service_ready_fn) (void);
463 
464  void (*totemsrp_waiting_trans_ack_cb_fn) (
465  int waiting_trans_ack);
466 
467  void (*memb_ring_id_create_or_load) (
468  struct memb_ring_id *memb_ring_id,
469  const struct totem_ip_address *addr);
470 
471  void (*memb_ring_id_store) (
472  const struct memb_ring_id *memb_ring_id,
473  const struct totem_ip_address *addr);
474 
476 
478 
479  unsigned long long token_ring_id_seq;
480 
481  unsigned int last_released;
482 
483  unsigned int set_aru;
484 
486 
488 
490 
491  unsigned int my_last_seq;
492 
493  struct timeval tv_old;
494 
496 
498 
499  unsigned int use_heartbeat;
500 
501  unsigned int my_trc;
502 
503  unsigned int my_pbl;
504 
505  unsigned int my_cbl;
506 
507  uint64_t pause_timestamp;
508 
510 
512 
514 
516 
518 
520 
521  int flushing;
522 
525  char commit_token_storage[40000];
526 };
527 
529  int count;
530  int (*handler_functions[6]) (
531  struct totemsrp_instance *instance,
532  const void *msg,
533  size_t msg_len,
534  int endian_conversion_needed);
535 };
536 
555 };
556 
557 const char* gather_state_from_desc [] = {
558  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
559  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
560  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
561  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
563  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
564  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
565  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
566  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
567  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
568  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
570  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
571  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
572  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
573  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
574 };
575 
576 /*
577  * forward decls
578  */
579 static int message_handler_orf_token (
580  struct totemsrp_instance *instance,
581  const void *msg,
582  size_t msg_len,
583  int endian_conversion_needed);
584 
585 static int message_handler_mcast (
586  struct totemsrp_instance *instance,
587  const void *msg,
588  size_t msg_len,
589  int endian_conversion_needed);
590 
591 static int message_handler_memb_merge_detect (
592  struct totemsrp_instance *instance,
593  const void *msg,
594  size_t msg_len,
595  int endian_conversion_needed);
596 
597 static int message_handler_memb_join (
598  struct totemsrp_instance *instance,
599  const void *msg,
600  size_t msg_len,
601  int endian_conversion_needed);
602 
603 static int message_handler_memb_commit_token (
604  struct totemsrp_instance *instance,
605  const void *msg,
606  size_t msg_len,
607  int endian_conversion_needed);
608 
609 static int message_handler_token_hold_cancel (
610  struct totemsrp_instance *instance,
611  const void *msg,
612  size_t msg_len,
613  int endian_conversion_needed);
614 
615 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
616 
617 static unsigned int main_msgs_missing (void);
618 
619 static void main_token_seqid_get (
620  const void *msg,
621  unsigned int *seqid,
622  unsigned int *token_is);
623 
624 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
625 
626 static void srp_addr_to_nodeid (
627  unsigned int *nodeid_out,
628  struct srp_addr *srp_addr_in,
629  unsigned int entries);
630 
631 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
632 
633 static void memb_leave_message_send (struct totemsrp_instance *instance);
634 
635 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
636 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
637 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
638 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
639  int fcc_mcasts_allowed);
640 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
641 
642 static void memb_ring_id_set (struct totemsrp_instance *instance,
643  const struct memb_ring_id *ring_id);
644 static void target_set_completed (void *context);
645 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
646 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
647 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
648 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
649 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
650 static int token_hold_cancel_send (struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
652 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
653 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
654 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
655 static void memb_merge_detect_endian_convert (
656  const struct memb_merge_detect *in,
657  struct memb_merge_detect *out);
658 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (void *data);
660 static void timer_function_pause_timeout (void *data);
661 static void timer_function_heartbeat_timeout (void *data);
662 static void timer_function_token_retransmit_timeout (void *data);
663 static void timer_function_token_hold_retransmit_timeout (void *data);
664 static void timer_function_merge_detect_timeout (void *data);
665 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
666 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
667 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
668 
669 void main_deliver_fn (
670  void *context,
671  const void *msg,
672  unsigned int msg_len);
673 
675  void *context,
676  const struct totem_ip_address *iface_address,
677  unsigned int iface_no);
678 
680  6,
681  {
682  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
683  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
684  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
685  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
686  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
687  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
688  }
689 };
690 
691 #define log_printf(level, format, args...) \
692 do { \
693  instance->totemsrp_log_printf ( \
694  level, instance->totemsrp_subsys_id, \
695  __FUNCTION__, __FILE__, __LINE__, \
696  format, ##args); \
697 } while (0);
698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
699 do { \
700  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
701  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
702  instance->totemsrp_log_printf ( \
703  level, instance->totemsrp_subsys_id, \
704  __FUNCTION__, __FILE__, __LINE__, \
705  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
706  } while(0)
707 
708 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
709 {
710  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
711  return gather_state_from_desc[gsfrom];
712  }
713  else {
714  return "UNKNOWN";
715  }
716 }
717 
718 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
719 {
720  memset (instance, 0, sizeof (struct totemsrp_instance));
721 
722  list_init (&instance->token_callback_received_listhead);
723 
724  list_init (&instance->token_callback_sent_listhead);
725 
726  instance->my_received_flg = 1;
727 
728  instance->my_token_seq = SEQNO_START_TOKEN - 1;
729 
731 
732  instance->set_aru = -1;
733 
734  instance->my_aru = SEQNO_START_MSG;
735 
737 
739 
740  instance->orf_token_discard = 0;
741 
742  instance->originated_orf_token = 0;
743 
744  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
745 
746  instance->my_id.no_addrs = INTERFACE_MAX;
747 
748  instance->waiting_trans_ack = 1;
749 }
750 
751 static void main_token_seqid_get (
752  const void *msg,
753  unsigned int *seqid,
754  unsigned int *token_is)
755 {
756  const struct orf_token *token = msg;
757 
758  *seqid = 0;
759  *token_is = 0;
760  if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
761  *seqid = token->token_seq;
762  *token_is = 1;
763  }
764 }
765 
766 static unsigned int main_msgs_missing (void)
767 {
768 // TODO
769  return (0);
770 }
771 
772 static int pause_flush (struct totemsrp_instance *instance)
773 {
774  uint64_t now_msec;
775  uint64_t timestamp_msec;
776  int res = 0;
777 
778  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
779  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
780 
781  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
783  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
784  /*
785  * -1 indicates an error from recvmsg
786  */
787  do {
789  } while (res == -1);
790  }
791  return (res);
792 }
793 
794 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
795 {
796  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
797  uint32_t time_now;
798  unsigned long long nano_secs = qb_util_nano_current_get ();
799 
800  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
801 
802  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
803  /* incr latest token the index */
804  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
805  instance->stats.latest_token = 0;
806  else
807  instance->stats.latest_token++;
808 
809  if (instance->stats.earliest_token == instance->stats.latest_token) {
810  /* we have filled up the array, start overwriting */
811  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
812  instance->stats.earliest_token = 0;
813  else
814  instance->stats.earliest_token++;
815 
816  instance->stats.token[instance->stats.earliest_token].rx = 0;
817  instance->stats.token[instance->stats.earliest_token].tx = 0;
818  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
819  }
820 
821  instance->stats.token[instance->stats.latest_token].rx = time_now;
822  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
823  } else {
824  instance->stats.token[instance->stats.latest_token].tx = time_now;
825  }
826  return 0;
827 }
828 
829 /*
830  * Exported interfaces
831  */
833  qb_loop_t *poll_handle,
834  void **srp_context,
835  struct totem_config *totem_config,
837 
838  void (*deliver_fn) (
839  unsigned int nodeid,
840  const void *msg,
841  unsigned int msg_len,
842  int endian_conversion_required),
843 
844  void (*confchg_fn) (
845  enum totem_configuration_type configuration_type,
846  const unsigned int *member_list, size_t member_list_entries,
847  const unsigned int *left_list, size_t left_list_entries,
848  const unsigned int *joined_list, size_t joined_list_entries,
849  const struct memb_ring_id *ring_id),
850  void (*waiting_trans_ack_cb_fn) (
851  int waiting_trans_ack))
852 {
853  struct totemsrp_instance *instance;
854 
855  instance = malloc (sizeof (struct totemsrp_instance));
856  if (instance == NULL) {
857  goto error_exit;
858  }
859 
860  totemsrp_instance_initialize (instance);
861 
862  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
863  instance->totemsrp_waiting_trans_ack_cb_fn (1);
864 
865  stats->srp = &instance->stats;
866  instance->stats.latest_token = 0;
867  instance->stats.earliest_token = 0;
868 
869  instance->totem_config = totem_config;
870 
871  /*
872  * Configure logging
873  */
882 
883  /*
884  * Configure totem store and load functions
885  */
887  instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
888 
889  /*
890  * Initialize local variables for totemsrp
891  */
892  totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
893 
894  /*
895  * Display totem configuration
896  */
898  "Token Timeout (%d ms) retransmit timeout (%d ms)",
899  totem_config->token_timeout, totem_config->token_retransmit_timeout);
901  "token hold (%d ms) retransmits before loss (%d retrans)",
902  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
904  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
905  totem_config->join_timeout,
906  totem_config->send_join_timeout,
907  totem_config->consensus_timeout,
908 
909  totem_config->merge_timeout);
911  "downcheck (%d ms) fail to recv const (%d msgs)",
912  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
914  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
915 
917  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
918  totem_config->window_size, totem_config->max_messages);
919 
921  "missed count const (%d messages)",
922  totem_config->miss_count_const);
923 
925  "send threads (%d threads)", totem_config->threads);
927  "RRP token expired timeout (%d ms)",
928  totem_config->rrp_token_expired_timeout);
930  "RRP token problem counter (%d ms)",
931  totem_config->rrp_problem_count_timeout);
933  "RRP threshold (%d problem count)",
934  totem_config->rrp_problem_count_threshold);
936  "RRP multicast threshold (%d problem count)",
937  totem_config->rrp_problem_count_mcast_threshold);
939  "RRP automatic recovery check timeout (%d ms)",
940  totem_config->rrp_autorecovery_check_timeout);
942  "RRP mode set to %s.", instance->totem_config->rrp_mode);
943 
945  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
947  "max_network_delay (%d ms)", totem_config->max_network_delay);
948 
949 
950  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
951  sizeof (struct message_item), instance->threaded_mode_enabled);
952 
953  sq_init (&instance->regular_sort_queue,
954  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
955 
956  sq_init (&instance->recovery_sort_queue,
957  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
958 
959  instance->totemsrp_poll_handle = poll_handle;
960 
961  instance->totemsrp_deliver_fn = deliver_fn;
962 
963  instance->totemsrp_confchg_fn = confchg_fn;
964  instance->use_heartbeat = 1;
965 
966  timer_function_pause_timeout (instance);
967 
968  if ( totem_config->heartbeat_failures_allowed == 0 ) {
970  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
971  instance->use_heartbeat = 0;
972  }
973 
974  if (instance->use_heartbeat) {
975  instance->heartbeat_timeout
976  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
977  + totem_config->max_network_delay;
978 
979  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
981  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
982  instance->heartbeat_timeout,
983  totem_config->token_timeout);
985  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
988  instance->use_heartbeat = 0;
989  }
990  else {
992  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
993  }
994  }
995 
997  poll_handle,
998  &instance->totemrrp_context,
999  totem_config,
1000  stats->srp,
1001  instance,
1002  main_deliver_fn,
1003  main_iface_change_fn,
1004  main_token_seqid_get,
1005  main_msgs_missing,
1006  target_set_completed);
1007 
1008  /*
1009  * Must have net_mtu adjusted by totemrrp_initialize first
1010  */
1011  cs_queue_init (&instance->new_message_queue,
1013  sizeof (struct message_item), instance->threaded_mode_enabled);
1014 
1015  cs_queue_init (&instance->new_message_queue_trans,
1017  sizeof (struct message_item), instance->threaded_mode_enabled);
1018 
1020  &instance->token_recv_event_handle,
1022  0,
1023  token_event_stats_collector,
1024  instance);
1026  &instance->token_sent_event_handle,
1028  0,
1029  token_event_stats_collector,
1030  instance);
1031  *srp_context = instance;
1032  return (0);
1033 
1034 error_exit:
1035  return (-1);
1036 }
1037 
1039  void *srp_context)
1040 {
1041  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1042 
1043 
1044  memb_leave_message_send (instance);
1045  totemrrp_finalize (instance->totemrrp_context);
1046  cs_queue_free (&instance->new_message_queue);
1047  cs_queue_free (&instance->new_message_queue_trans);
1048  cs_queue_free (&instance->retrans_message_queue);
1049  sq_free (&instance->regular_sort_queue);
1050  sq_free (&instance->recovery_sort_queue);
1051  free (instance);
1052 }
1053 
1054 /*
1055  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1056  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1057  * function.
1058  *
1059  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1060  * and if interface was not found, -1 is returned.
1061  */
1063  void *srp_context,
1064  unsigned int nodeid,
1065  struct totem_ip_address *interfaces,
1066  unsigned int interfaces_size,
1067  char ***status,
1068  unsigned int *iface_count)
1069 {
1070  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1071  int res = 0;
1072  unsigned int found = 0;
1073  unsigned int i;
1074 
1075  for (i = 0; i < instance->my_memb_entries; i++) {
1076  if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
1077  found = 1;
1078  break;
1079  }
1080  }
1081 
1082  if (found) {
1083  *iface_count = instance->totem_config->interface_count;
1084 
1085  if (interfaces_size >= *iface_count) {
1086  memcpy (interfaces, instance->my_memb_list[i].addr,
1087  sizeof (struct totem_ip_address) * *iface_count);
1088  } else {
1089  res = -2;
1090  }
1091 
1092  goto finish;
1093  }
1094 
1095  for (i = 0; i < instance->my_left_memb_entries; i++) {
1096  if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
1097  found = 1;
1098  break;
1099  }
1100  }
1101 
1102  if (found) {
1103  *iface_count = instance->totem_config->interface_count;
1104 
1105  if (interfaces_size >= *iface_count) {
1106  memcpy (interfaces, instance->my_left_memb_list[i].addr,
1107  sizeof (struct totem_ip_address) * *iface_count);
1108  } else {
1109  res = -2;
1110  }
1111  } else {
1112  res = -1;
1113  }
1114 
1115 finish:
1116  totemrrp_ifaces_get (instance->totemrrp_context, status, NULL);
1117  return (res);
1118 }
1119 
1121  void *srp_context,
1122  const char *cipher_type,
1123  const char *hash_type)
1124 {
1125  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126  int res;
1127 
1128  res = totemrrp_crypto_set(instance->totemrrp_context, cipher_type, hash_type);
1129 
1130  return (res);
1131 }
1132 
1133 
1135  void *srp_context)
1136 {
1137  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1138  unsigned int res;
1139 
1140  res = instance->totem_config->interfaces[0].boundto.nodeid;
1141 
1142  return (res);
1143 }
1144 
1146  void *srp_context)
1147 {
1148  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1149  int res;
1150 
1151  res = instance->totem_config->interfaces[0].boundto.family;
1152 
1153  return (res);
1154 }
1155 
1156 
1158  void *srp_context)
1159 {
1160  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1161 
1163  instance->totem_config->interface_count);
1164 
1165  return (0);
1166 }
1167 
1168 
1169 /*
1170  * Set operations for use by the membership algorithm
1171  */
1172 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1173 {
1174  unsigned int i;
1175  unsigned int res;
1176 
1177  for (i = 0; i < 1; i++) {
1178  res = totemip_equal (&a->addr[i], &b->addr[i]);
1179  if (res == 0) {
1180  return (0);
1181  }
1182  }
1183  return (1);
1184 }
1185 
1186 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1187 {
1188  unsigned int i;
1189 
1190  dest->no_addrs = src->no_addrs;
1191 
1192  for (i = 0; i < INTERFACE_MAX; i++) {
1193  totemip_copy (&dest->addr[i], &src->addr[i]);
1194  }
1195 }
1196 
1197 static void srp_addr_to_nodeid (
1198  unsigned int *nodeid_out,
1199  struct srp_addr *srp_addr_in,
1200  unsigned int entries)
1201 {
1202  unsigned int i;
1203 
1204  for (i = 0; i < entries; i++) {
1205  nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
1206  }
1207 }
1208 
1209 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1210 {
1211  int i;
1212 
1213  for (i = 0; i < INTERFACE_MAX; i++) {
1214  totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
1215  }
1216 }
1217 
1218 static void memb_consensus_reset (struct totemsrp_instance *instance)
1219 {
1220  instance->consensus_list_entries = 0;
1221 }
1222 
1223 static void memb_set_subtract (
1224  struct srp_addr *out_list, int *out_list_entries,
1225  struct srp_addr *one_list, int one_list_entries,
1226  struct srp_addr *two_list, int two_list_entries)
1227 {
1228  int found = 0;
1229  int i;
1230  int j;
1231 
1232  *out_list_entries = 0;
1233 
1234  for (i = 0; i < one_list_entries; i++) {
1235  for (j = 0; j < two_list_entries; j++) {
1236  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1237  found = 1;
1238  break;
1239  }
1240  }
1241  if (found == 0) {
1242  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1243  *out_list_entries = *out_list_entries + 1;
1244  }
1245  found = 0;
1246  }
1247 }
1248 
1249 /*
1250  * Set consensus for a specific processor
1251  */
1252 static void memb_consensus_set (
1253  struct totemsrp_instance *instance,
1254  const struct srp_addr *addr)
1255 {
1256  int found = 0;
1257  int i;
1258 
1259  if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
1260  return;
1261 
1262  for (i = 0; i < instance->consensus_list_entries; i++) {
1263  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1264  found = 1;
1265  break; /* found entry */
1266  }
1267  }
1268  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1269  instance->consensus_list[i].set = 1;
1270  if (found == 0) {
1271  instance->consensus_list_entries++;
1272  }
1273  return;
1274 }
1275 
1276 /*
1277  * Is consensus set for a specific processor
1278  */
1279 static int memb_consensus_isset (
1280  struct totemsrp_instance *instance,
1281  const struct srp_addr *addr)
1282 {
1283  int i;
1284 
1285  for (i = 0; i < instance->consensus_list_entries; i++) {
1286  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1287  return (instance->consensus_list[i].set);
1288  }
1289  }
1290  return (0);
1291 }
1292 
1293 /*
1294  * Is consensus agreed upon based upon consensus database
1295  */
1296 static int memb_consensus_agreed (
1297  struct totemsrp_instance *instance)
1298 {
1299  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1300  int token_memb_entries = 0;
1301  int agreed = 1;
1302  int i;
1303 
1304  memb_set_subtract (token_memb, &token_memb_entries,
1305  instance->my_proc_list, instance->my_proc_list_entries,
1306  instance->my_failed_list, instance->my_failed_list_entries);
1307 
1308  for (i = 0; i < token_memb_entries; i++) {
1309  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1310  agreed = 0;
1311  break;
1312  }
1313  }
1314 
1315  if (agreed && instance->failed_to_recv == 1) {
1316  /*
1317  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1318  * will create single ring anyway.
1319  */
1320 
1321  return (agreed);
1322  }
1323 
1324  assert (token_memb_entries >= 1);
1325 
1326  return (agreed);
1327 }
1328 
1329 static void memb_consensus_notset (
1330  struct totemsrp_instance *instance,
1331  struct srp_addr *no_consensus_list,
1332  int *no_consensus_list_entries,
1333  struct srp_addr *comparison_list,
1334  int comparison_list_entries)
1335 {
1336  int i;
1337 
1338  *no_consensus_list_entries = 0;
1339 
1340  for (i = 0; i < instance->my_proc_list_entries; i++) {
1341  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1342  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1343  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1344  }
1345  }
1346 }
1347 
1348 /*
1349  * Is set1 equal to set2 Entries can be in different orders
1350  */
1351 static int memb_set_equal (
1352  struct srp_addr *set1, int set1_entries,
1353  struct srp_addr *set2, int set2_entries)
1354 {
1355  int i;
1356  int j;
1357 
1358  int found = 0;
1359 
1360  if (set1_entries != set2_entries) {
1361  return (0);
1362  }
1363  for (i = 0; i < set2_entries; i++) {
1364  for (j = 0; j < set1_entries; j++) {
1365  if (srp_addr_equal (&set1[j], &set2[i])) {
1366  found = 1;
1367  break;
1368  }
1369  }
1370  if (found == 0) {
1371  return (0);
1372  }
1373  found = 0;
1374  }
1375  return (1);
1376 }
1377 
1378 /*
1379  * Is subset fully contained in fullset
1380  */
1381 static int memb_set_subset (
1382  const struct srp_addr *subset, int subset_entries,
1383  const struct srp_addr *fullset, int fullset_entries)
1384 {
1385  int i;
1386  int j;
1387  int found = 0;
1388 
1389  if (subset_entries > fullset_entries) {
1390  return (0);
1391  }
1392  for (i = 0; i < subset_entries; i++) {
1393  for (j = 0; j < fullset_entries; j++) {
1394  if (srp_addr_equal (&subset[i], &fullset[j])) {
1395  found = 1;
1396  }
1397  }
1398  if (found == 0) {
1399  return (0);
1400  }
1401  found = 0;
1402  }
1403  return (1);
1404 }
1405 /*
1406  * merge subset into fullset taking care not to add duplicates
1407  */
1408 static void memb_set_merge (
1409  const struct srp_addr *subset, int subset_entries,
1410  struct srp_addr *fullset, int *fullset_entries)
1411 {
1412  int found = 0;
1413  int i;
1414  int j;
1415 
1416  for (i = 0; i < subset_entries; i++) {
1417  for (j = 0; j < *fullset_entries; j++) {
1418  if (srp_addr_equal (&fullset[j], &subset[i])) {
1419  found = 1;
1420  break;
1421  }
1422  }
1423  if (found == 0) {
1424  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1425  *fullset_entries = *fullset_entries + 1;
1426  }
1427  found = 0;
1428  }
1429  return;
1430 }
1431 
1432 static void memb_set_and_with_ring_id (
1433  struct srp_addr *set1,
1434  struct memb_ring_id *set1_ring_ids,
1435  int set1_entries,
1436  struct srp_addr *set2,
1437  int set2_entries,
1438  struct memb_ring_id *old_ring_id,
1439  struct srp_addr *and,
1440  int *and_entries)
1441 {
1442  int i;
1443  int j;
1444  int found = 0;
1445 
1446  *and_entries = 0;
1447 
1448  for (i = 0; i < set2_entries; i++) {
1449  for (j = 0; j < set1_entries; j++) {
1450  if (srp_addr_equal (&set1[j], &set2[i])) {
1451  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1452  found = 1;
1453  }
1454  break;
1455  }
1456  }
1457  if (found) {
1458  srp_addr_copy (&and[*and_entries], &set1[j]);
1459  *and_entries = *and_entries + 1;
1460  }
1461  found = 0;
1462  }
1463  return;
1464 }
1465 
1466 #ifdef CODE_COVERAGE
1467 static void memb_set_print (
1468  char *string,
1469  struct srp_addr *list,
1470  int list_entries)
1471 {
1472  int i;
1473  int j;
1474  printf ("List '%s' contains %d entries:\n", string, list_entries);
1475 
1476  for (i = 0; i < list_entries; i++) {
1477  printf ("Address %d with %d rings\n", i, list[i].no_addrs);
1478  for (j = 0; j < list[i].no_addrs; j++) {
1479  printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
1480  printf ("\tfamily %d\n", list[i].addr[j].family);
1481  }
1482  }
1483 }
1484 #endif
1485 static void my_leave_memb_clear(
1486  struct totemsrp_instance *instance)
1487 {
1488  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1489  instance->my_leave_memb_entries = 0;
1490 }
1491 
1492 static unsigned int my_leave_memb_match(
1493  struct totemsrp_instance *instance,
1494  unsigned int nodeid)
1495 {
1496  int i;
1497  unsigned int ret = 0;
1498 
1499  for (i = 0; i < instance->my_leave_memb_entries; i++){
1500  if (instance->my_leave_memb_list[i] == nodeid){
1501  ret = nodeid;
1502  break;
1503  }
1504  }
1505  return ret;
1506 }
1507 
1508 static void my_leave_memb_set(
1509  struct totemsrp_instance *instance,
1510  unsigned int nodeid)
1511 {
1512  int i, found = 0;
1513  for (i = 0; i < instance->my_leave_memb_entries; i++){
1514  if (instance->my_leave_memb_list[i] == nodeid){
1515  found = 1;
1516  break;
1517  }
1518  }
1519  if (found == 1) {
1520  return;
1521  }
1522  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1523  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1524  instance->my_leave_memb_entries++;
1525  } else {
1527  "Cannot set LEAVE nodeid=%d", nodeid);
1528  }
1529 }
1530 
1531 
1532 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1533 {
1534  assert (instance != NULL);
1535  return totemrrp_buffer_alloc (instance->totemrrp_context);
1536 }
1537 
1538 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1539 {
1540  assert (instance != NULL);
1541  totemrrp_buffer_release (instance->totemrrp_context, ptr);
1542 }
1543 
1544 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1545 {
1546  int32_t res;
1547 
1548  qb_loop_timer_del (instance->totemsrp_poll_handle,
1550  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1551  QB_LOOP_MED,
1552  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1553  (void *)instance,
1554  timer_function_token_retransmit_timeout,
1556  if (res != 0) {
1557  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1558  }
1559 
1560 }
1561 
1562 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1563 {
1564  int32_t res;
1565 
1566  if (instance->my_merge_detect_timeout_outstanding == 0) {
1567  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1568  QB_LOOP_MED,
1569  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1570  (void *)instance,
1571  timer_function_merge_detect_timeout,
1572  &instance->timer_merge_detect_timeout);
1573  if (res != 0) {
1574  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1575  }
1576 
1577  instance->my_merge_detect_timeout_outstanding = 1;
1578  }
1579 }
1580 
1581 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1582 {
1583  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1585 }
1586 
1587 /*
1588  * ring_state_* is used to save and restore the sort queue
1589  * state when a recovery operation fails (and enters gather)
1590  */
1591 static void old_ring_state_save (struct totemsrp_instance *instance)
1592 {
1593  if (instance->old_ring_state_saved == 0) {
1594  instance->old_ring_state_saved = 1;
1595  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1596  sizeof (struct memb_ring_id));
1597  instance->old_ring_state_aru = instance->my_aru;
1600  "Saving state aru %x high seq received %x",
1601  instance->my_aru, instance->my_high_seq_received);
1602  }
1603 }
1604 
1605 static void old_ring_state_restore (struct totemsrp_instance *instance)
1606 {
1607  instance->my_aru = instance->old_ring_state_aru;
1610  "Restoring instance->my_aru %x my high seq received %x",
1611  instance->my_aru, instance->my_high_seq_received);
1612 }
1613 
1614 static void old_ring_state_reset (struct totemsrp_instance *instance)
1615 {
1617  "Resetting old ring state");
1618  instance->old_ring_state_saved = 0;
1619 }
1620 
1621 static void reset_pause_timeout (struct totemsrp_instance *instance)
1622 {
1623  int32_t res;
1624 
1625  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1626  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1627  QB_LOOP_MED,
1628  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1629  (void *)instance,
1630  timer_function_pause_timeout,
1631  &instance->timer_pause_timeout);
1632  if (res != 0) {
1633  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1634  }
1635 }
1636 
1637 static void reset_token_timeout (struct totemsrp_instance *instance) {
1638  int32_t res;
1639 
1640  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1641  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1642  QB_LOOP_MED,
1643  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1644  (void *)instance,
1645  timer_function_orf_token_timeout,
1646  &instance->timer_orf_token_timeout);
1647  if (res != 0) {
1648  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1649  }
1650 }
1651 
1652 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1653  int32_t res;
1654 
1655  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1656  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1657  QB_LOOP_MED,
1658  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1659  (void *)instance,
1660  timer_function_heartbeat_timeout,
1661  &instance->timer_heartbeat_timeout);
1662  if (res != 0) {
1663  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1664  }
1665 }
1666 
1667 
1668 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1669  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1670 }
1671 
1672 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1673  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1674 }
1675 
1676 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1677 {
1678  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1679 }
1680 
1681 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1682 {
1683  int32_t res;
1684 
1685  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1686  QB_LOOP_MED,
1687  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1688  (void *)instance,
1689  timer_function_token_hold_retransmit_timeout,
1691  if (res != 0) {
1692  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1693  }
1694 }
1695 
1696 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1697 {
1698  qb_loop_timer_del (instance->totemsrp_poll_handle,
1700 }
1701 
1702 static void memb_state_consensus_timeout_expired (
1703  struct totemsrp_instance *instance)
1704 {
1705  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1706  int no_consensus_list_entries;
1707 
1708  instance->stats.consensus_timeouts++;
1709  if (memb_consensus_agreed (instance)) {
1710  memb_consensus_reset (instance);
1711 
1712  memb_consensus_set (instance, &instance->my_id);
1713 
1714  reset_token_timeout (instance); // REVIEWED
1715  } else {
1716  memb_consensus_notset (
1717  instance,
1718  no_consensus_list,
1719  &no_consensus_list_entries,
1720  instance->my_proc_list,
1721  instance->my_proc_list_entries);
1722 
1723  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1724  instance->my_failed_list, &instance->my_failed_list_entries);
1725  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1726  }
1727 }
1728 
1729 static void memb_join_message_send (struct totemsrp_instance *instance);
1730 
1731 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1732 
1733 /*
1734  * Timers used for various states of the membership algorithm
1735  */
1736 static void timer_function_pause_timeout (void *data)
1737 {
1738  struct totemsrp_instance *instance = data;
1739 
1740  instance->pause_timestamp = qb_util_nano_current_get ();
1741  reset_pause_timeout (instance);
1742 }
1743 
1744 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1745 {
1746  old_ring_state_restore (instance);
1747  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1748  instance->stats.recovery_token_lost++;
1749 }
1750 
1751 static void timer_function_orf_token_timeout (void *data)
1752 {
1753  struct totemsrp_instance *instance = data;
1754 
1755  switch (instance->memb_state) {
1758  "The token was lost in the OPERATIONAL state.");
1760  "A processor failed, forming new configuration.");
1762  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1763  instance->stats.operational_token_lost++;
1764  break;
1765 
1766  case MEMB_STATE_GATHER:
1768  "The consensus timeout expired.");
1769  memb_state_consensus_timeout_expired (instance);
1770  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1771  instance->stats.gather_token_lost++;
1772  break;
1773 
1774  case MEMB_STATE_COMMIT:
1776  "The token was lost in the COMMIT state.");
1777  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1778  instance->stats.commit_token_lost++;
1779  break;
1780 
1781  case MEMB_STATE_RECOVERY:
1783  "The token was lost in the RECOVERY state.");
1784  memb_recovery_state_token_loss (instance);
1785  instance->orf_token_discard = 1;
1786  break;
1787  }
1788 }
1789 
1790 static void timer_function_heartbeat_timeout (void *data)
1791 {
1792  struct totemsrp_instance *instance = data;
1794  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1795  timer_function_orf_token_timeout(data);
1796 }
1797 
1798 static void memb_timer_function_state_gather (void *data)
1799 {
1800  struct totemsrp_instance *instance = data;
1801  int32_t res;
1802 
1803  switch (instance->memb_state) {
1805  case MEMB_STATE_RECOVERY:
1806  assert (0); /* this should never happen */
1807  break;
1808  case MEMB_STATE_GATHER:
1809  case MEMB_STATE_COMMIT:
1810  memb_join_message_send (instance);
1811 
1812  /*
1813  * Restart the join timeout
1814  `*/
1815  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1816 
1817  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1818  QB_LOOP_MED,
1819  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1820  (void *)instance,
1821  memb_timer_function_state_gather,
1822  &instance->memb_timer_state_gather_join_timeout);
1823 
1824  if (res != 0) {
1825  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1826  }
1827  break;
1828  }
1829 }
1830 
1831 static void memb_timer_function_gather_consensus_timeout (void *data)
1832 {
1833  struct totemsrp_instance *instance = data;
1834  memb_state_consensus_timeout_expired (instance);
1835 }
1836 
1837 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1838 {
1839  unsigned int i;
1840  struct sort_queue_item *recovery_message_item;
1841  struct sort_queue_item regular_message_item;
1842  unsigned int range = 0;
1843  int res;
1844  void *ptr;
1845  struct mcast *mcast;
1846 
1848  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1849 
1850  range = instance->my_aru - SEQNO_START_MSG;
1851  /*
1852  * Move messages from recovery to regular sort queue
1853  */
1854 // todo should i be initialized to 0 or 1 ?
1855  for (i = 1; i <= range; i++) {
1856  res = sq_item_get (&instance->recovery_sort_queue,
1857  i + SEQNO_START_MSG, &ptr);
1858  if (res != 0) {
1859  continue;
1860  }
1861  recovery_message_item = ptr;
1862 
1863  /*
1864  * Convert recovery message into regular message
1865  */
1866  mcast = recovery_message_item->mcast;
1867  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1868  /*
1869  * Message is a recovery message encapsulated
1870  * in a new ring message
1871  */
1872  regular_message_item.mcast =
1873  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1874  regular_message_item.msg_len =
1875  recovery_message_item->msg_len - sizeof (struct mcast);
1876  mcast = regular_message_item.mcast;
1877  } else {
1878  /*
1879  * TODO this case shouldn't happen
1880  */
1881  continue;
1882  }
1883 
1885  "comparing if ring id is for this processors old ring seqno %d",
1886  mcast->seq);
1887 
1888  /*
1889  * Only add this message to the regular sort
1890  * queue if it was originated with the same ring
1891  * id as the previous ring
1892  */
1893  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1894  sizeof (struct memb_ring_id)) == 0) {
1895 
1896  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1897  if (res == 0) {
1898  sq_item_add (&instance->regular_sort_queue,
1899  &regular_message_item, mcast->seq);
1900  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1901  instance->old_ring_state_high_seq_received = mcast->seq;
1902  }
1903  }
1904  } else {
1906  "-not adding msg with seq no %x", mcast->seq);
1907  }
1908  }
1909 }
1910 
1911 /*
1912  * Change states in the state machine of the membership algorithm
1913  */
1914 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1915 {
1916  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1917  int joined_list_entries = 0;
1918  unsigned int aru_save;
1919  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1920  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1921  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1922  unsigned int left_list[PROCESSOR_COUNT_MAX];
1923  unsigned int i;
1924  unsigned int res;
1925  char left_node_msg[1024];
1926  char joined_node_msg[1024];
1927  char failed_node_msg[1024];
1928 
1929  instance->originated_orf_token = 0;
1930 
1931  memb_consensus_reset (instance);
1932 
1933  old_ring_state_reset (instance);
1934 
1935  deliver_messages_from_recovery_to_regular (instance);
1936 
1938  "Delivering to app %x to %x",
1939  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1940 
1941  aru_save = instance->my_aru;
1942  instance->my_aru = instance->old_ring_state_aru;
1943 
1944  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1945 
1946  /*
1947  * Calculate joined and left list
1948  */
1949  memb_set_subtract (instance->my_left_memb_list,
1950  &instance->my_left_memb_entries,
1951  instance->my_memb_list, instance->my_memb_entries,
1952  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1953 
1954  memb_set_subtract (joined_list, &joined_list_entries,
1955  instance->my_new_memb_list, instance->my_new_memb_entries,
1956  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1957 
1958  /*
1959  * Install new membership
1960  */
1961  instance->my_memb_entries = instance->my_new_memb_entries;
1962  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1963  sizeof (struct srp_addr) * instance->my_memb_entries);
1964  instance->last_released = 0;
1965  instance->my_set_retrans_flg = 0;
1966 
1967  /*
1968  * Inform RRP about transitional change
1969  */
1971  instance->totemrrp_context,
1973  instance->my_trans_memb_list, instance->my_trans_memb_entries,
1974  instance->my_left_memb_list, instance->my_left_memb_entries,
1975  NULL, 0,
1976  &instance->my_ring_id);
1977  /*
1978  * Deliver transitional configuration to application
1979  */
1980  srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
1981  instance->my_left_memb_entries);
1982  srp_addr_to_nodeid (trans_memb_list_totemip,
1983  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1985  trans_memb_list_totemip, instance->my_trans_memb_entries,
1986  left_list, instance->my_left_memb_entries,
1987  0, 0, &instance->my_ring_id);
1988  instance->waiting_trans_ack = 1;
1989  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1990 
1991 // TODO we need to filter to ensure we only deliver those
1992 // messages which are part of instance->my_deliver_memb
1993  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1994 
1995  instance->my_aru = aru_save;
1996 
1997  /*
1998  * Inform RRP about regular membership change
1999  */
2001  instance->totemrrp_context,
2003  instance->my_new_memb_list, instance->my_new_memb_entries,
2004  NULL, 0,
2005  joined_list, joined_list_entries,
2006  &instance->my_ring_id);
2007  /*
2008  * Deliver regular configuration to application
2009  */
2010  srp_addr_to_nodeid (new_memb_list_totemip,
2011  instance->my_new_memb_list, instance->my_new_memb_entries);
2012  srp_addr_to_nodeid (joined_list_totemip, joined_list,
2013  joined_list_entries);
2015  new_memb_list_totemip, instance->my_new_memb_entries,
2016  0, 0,
2017  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2018 
2019  /*
2020  * The recovery sort queue now becomes the regular
2021  * sort queue. It is necessary to copy the state
2022  * into the regular sort queue.
2023  */
2024  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2025  instance->my_last_aru = SEQNO_START_MSG;
2026 
2027  /* When making my_proc_list smaller, ensure that the
2028  * now non-used entries are zero-ed out. There are some suspect
2029  * assert's that assume that there is always 2 entries in the list.
2030  * These fail when my_proc_list is reduced to 1 entry (and the
2031  * valid [0] entry is the same as the 'unused' [1] entry).
2032  */
2033  memset(instance->my_proc_list, 0,
2034  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2035 
2036  instance->my_proc_list_entries = instance->my_new_memb_entries;
2037  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2038  sizeof (struct srp_addr) * instance->my_memb_entries);
2039 
2040  instance->my_failed_list_entries = 0;
2041  /*
2042  * TODO Not exactly to spec
2043  *
2044  * At the entry to this function all messages without a gap are
2045  * deliered.
2046  *
2047  * This code throw away messages from the last gap in the sort queue
2048  * to my_high_seq_received
2049  *
2050  * What should really happen is we should deliver all messages up to
2051  * a gap, then delier the transitional configuration, then deliver
2052  * the messages between the first gap and my_high_seq_received, then
2053  * deliver a regular configuration, then deliver the regular
2054  * configuration
2055  *
2056  * Unfortunately totempg doesn't appear to like this operating mode
2057  * which needs more inspection
2058  */
2059  i = instance->my_high_seq_received + 1;
2060  do {
2061  void *ptr;
2062 
2063  i -= 1;
2064  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2065  if (i == 0) {
2066  break;
2067  }
2068  } while (res);
2069 
2070  instance->my_high_delivered = i;
2071 
2072  for (i = 0; i <= instance->my_high_delivered; i++) {
2073  void *ptr;
2074 
2075  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2076  if (res == 0) {
2077  struct sort_queue_item *regular_message;
2078 
2079  regular_message = ptr;
2080  free (regular_message->mcast);
2081  }
2082  }
2083  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2084  instance->last_released = instance->my_high_delivered;
2085 
2086  if (joined_list_entries) {
2087  int sptr = 0;
2088  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2089  for (i=0; i< joined_list_entries; i++) {
2090  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %u", joined_list_totemip[i]);
2091  }
2092  }
2093  else {
2094  joined_node_msg[0] = '\0';
2095  }
2096 
2097  if (instance->my_left_memb_entries) {
2098  int sptr = 0;
2099  int sptr2 = 0;
2100  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2101  for (i=0; i< instance->my_left_memb_entries; i++) {
2102  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %u", left_list[i]);
2103  }
2104  for (i=0; i< instance->my_left_memb_entries; i++) {
2105  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2106  if (sptr2 == 0) {
2107  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2108  }
2109  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " %u", left_list[i]);
2110  }
2111  }
2112  if (sptr2 == 0) {
2113  failed_node_msg[0] = '\0';
2114  }
2115  }
2116  else {
2117  left_node_msg[0] = '\0';
2118  failed_node_msg[0] = '\0';
2119  }
2120 
2121  my_leave_memb_clear(instance);
2122 
2124  "entering OPERATIONAL state.");
2126  "A new membership (%s:%lld) was formed. Members%s%s",
2127  totemip_print (&instance->my_ring_id.rep),
2128  instance->my_ring_id.seq,
2129  joined_node_msg,
2130  left_node_msg);
2131 
2132  if (strlen(failed_node_msg)) {
2134  "Failed to receive the leave message.%s",
2135  failed_node_msg);
2136  }
2137 
2138  instance->memb_state = MEMB_STATE_OPERATIONAL;
2139 
2140  instance->stats.operational_entered++;
2141  instance->stats.continuous_gather = 0;
2142 
2143  instance->my_received_flg = 1;
2144 
2145  reset_pause_timeout (instance);
2146 
2147  /*
2148  * Save ring id information from this configuration to determine
2149  * which processors are transitioning from old regular configuration
2150  * in to new regular configuration on the next configuration change
2151  */
2152  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2153  sizeof (struct memb_ring_id));
2154 
2155  return;
2156 }
2157 
2158 static void memb_state_gather_enter (
2159  struct totemsrp_instance *instance,
2160  enum gather_state_from gather_from)
2161 {
2162  int32_t res;
2163 
2164  instance->orf_token_discard = 1;
2165 
2166  instance->originated_orf_token = 0;
2167 
2168  memb_set_merge (
2169  &instance->my_id, 1,
2170  instance->my_proc_list, &instance->my_proc_list_entries);
2171 
2172  memb_join_message_send (instance);
2173 
2174  /*
2175  * Restart the join timeout
2176  */
2177  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2178 
2179  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2180  QB_LOOP_MED,
2181  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2182  (void *)instance,
2183  memb_timer_function_state_gather,
2184  &instance->memb_timer_state_gather_join_timeout);
2185  if (res != 0) {
2186  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2187  }
2188 
2189  /*
2190  * Restart the consensus timeout
2191  */
2192  qb_loop_timer_del (instance->totemsrp_poll_handle,
2193  instance->memb_timer_state_gather_consensus_timeout);
2194 
2195  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2196  QB_LOOP_MED,
2197  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2198  (void *)instance,
2199  memb_timer_function_gather_consensus_timeout,
2200  &instance->memb_timer_state_gather_consensus_timeout);
2201  if (res != 0) {
2202  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2203  }
2204 
2205  /*
2206  * Cancel the token loss and token retransmission timeouts
2207  */
2208  cancel_token_retransmit_timeout (instance); // REVIEWED
2209  cancel_token_timeout (instance); // REVIEWED
2210  cancel_merge_detect_timeout (instance);
2211 
2212  memb_consensus_reset (instance);
2213 
2214  memb_consensus_set (instance, &instance->my_id);
2215 
2216  log_printf (instance->totemsrp_log_level_debug,
2217  "entering GATHER state from %d(%s).",
2218  gather_from, gsfrom_to_msg(gather_from));
2219 
2220  instance->memb_state = MEMB_STATE_GATHER;
2221  instance->stats.gather_entered++;
2222 
2223  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2224  /*
2225  * State 3 means gather, so we are continuously gathering.
2226  */
2227  instance->stats.continuous_gather++;
2228  }
2229 
2230  return;
2231 }
2232 
2233 static void timer_function_token_retransmit_timeout (void *data);
2234 
2235 static void target_set_completed (
2236  void *context)
2237 {
2238  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2239 
2240  memb_state_commit_token_send (instance);
2241 
2242 }
2243 
2244 static void memb_state_commit_enter (
2245  struct totemsrp_instance *instance)
2246 {
2247  old_ring_state_save (instance);
2248 
2249  memb_state_commit_token_update (instance);
2250 
2251  memb_state_commit_token_target_set (instance);
2252 
2253  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2254 
2256 
2257  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2258 
2260 
2261  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2262  instance->memb_ring_id_store (&instance->my_ring_id, &instance->my_id.addr[0]);
2263 
2264  instance->token_ring_id_seq = instance->my_ring_id.seq;
2265 
2267  "entering COMMIT state.");
2268 
2269  instance->memb_state = MEMB_STATE_COMMIT;
2270  reset_token_retransmit_timeout (instance); // REVIEWED
2271  reset_token_timeout (instance); // REVIEWED
2272 
2273  instance->stats.commit_entered++;
2274  instance->stats.continuous_gather = 0;
2275 
2276  /*
2277  * reset all flow control variables since we are starting a new ring
2278  */
2279  instance->my_trc = 0;
2280  instance->my_pbl = 0;
2281  instance->my_cbl = 0;
2282  /*
2283  * commit token sent after callback that token target has been set
2284  */
2285 }
2286 
2287 static void memb_state_recovery_enter (
2288  struct totemsrp_instance *instance,
2289  struct memb_commit_token *commit_token)
2290 {
2291  int i;
2292  int local_received_flg = 1;
2293  unsigned int low_ring_aru;
2294  unsigned int range = 0;
2295  unsigned int messages_originated = 0;
2296  const struct srp_addr *addr;
2297  struct memb_commit_token_memb_entry *memb_list;
2298  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2299 
2300  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2301  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2302 
2304  "entering RECOVERY state.");
2305 
2306  instance->orf_token_discard = 0;
2307 
2308  instance->my_high_ring_delivered = 0;
2309 
2310  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2311  cs_queue_reinit (&instance->retrans_message_queue);
2312 
2313  low_ring_aru = instance->old_ring_state_high_seq_received;
2314 
2315  memb_state_commit_token_send_recovery (instance, commit_token);
2316 
2317  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2318 
2319  /*
2320  * Build regular configuration
2321  */
2323  instance->totemrrp_context,
2324  commit_token->addr_entries);
2325 
2326  /*
2327  * Build transitional configuration
2328  */
2329  for (i = 0; i < instance->my_new_memb_entries; i++) {
2330  memcpy (&my_new_memb_ring_id_list[i],
2331  &memb_list[i].ring_id,
2332  sizeof (struct memb_ring_id));
2333  }
2334  memb_set_and_with_ring_id (
2335  instance->my_new_memb_list,
2336  my_new_memb_ring_id_list,
2337  instance->my_new_memb_entries,
2338  instance->my_memb_list,
2339  instance->my_memb_entries,
2340  &instance->my_old_ring_id,
2341  instance->my_trans_memb_list,
2342  &instance->my_trans_memb_entries);
2343 
2344  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2346  "TRANS [%d] member %s:", i, totemip_print (&instance->my_trans_memb_list[i].addr[0]));
2347  }
2348  for (i = 0; i < instance->my_new_memb_entries; i++) {
2350  "position [%d] member %s:", i, totemip_print (&addr[i].addr[0]));
2352  "previous ring seq %llx rep %s",
2353  memb_list[i].ring_id.seq,
2354  totemip_print (&memb_list[i].ring_id.rep));
2355 
2357  "aru %x high delivered %x received flag %d",
2358  memb_list[i].aru,
2359  memb_list[i].high_delivered,
2360  memb_list[i].received_flg);
2361 
2362  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2363  }
2364  /*
2365  * Determine if any received flag is false
2366  */
2367  for (i = 0; i < commit_token->addr_entries; i++) {
2368  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2369  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2370 
2371  memb_list[i].received_flg == 0) {
2372  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2373  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2374  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2375  local_received_flg = 0;
2376  break;
2377  }
2378  }
2379  if (local_received_flg == 1) {
2380  goto no_originate;
2381  } /* Else originate messages if we should */
2382 
2383  /*
2384  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2385  */
2386  for (i = 0; i < commit_token->addr_entries; i++) {
2387  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2388  instance->my_deliver_memb_list,
2389  instance->my_deliver_memb_entries) &&
2390 
2391  memcmp (&instance->my_old_ring_id,
2392  &memb_list[i].ring_id,
2393  sizeof (struct memb_ring_id)) == 0) {
2394 
2395  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2396 
2397  low_ring_aru = memb_list[i].aru;
2398  }
2399  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2400  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2401  }
2402  }
2403  }
2404 
2405  /*
2406  * Copy all old ring messages to instance->retrans_message_queue
2407  */
2408  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2409  if (range == 0) {
2410  /*
2411  * No messages to copy
2412  */
2413  goto no_originate;
2414  }
2415  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2416 
2418  "copying all old ring messages from %x-%x.",
2419  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2420 
2421  for (i = 1; i <= range; i++) {
2423  struct message_item message_item;
2424  void *ptr;
2425  int res;
2426 
2427  res = sq_item_get (&instance->regular_sort_queue,
2428  low_ring_aru + i, &ptr);
2429  if (res != 0) {
2430  continue;
2431  }
2432  sort_queue_item = ptr;
2433  messages_originated++;
2434  memset (&message_item, 0, sizeof (struct message_item));
2435  // TODO LEAK
2436  message_item.mcast = totemsrp_buffer_alloc (instance);
2437  assert (message_item.mcast);
2438  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2439  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2441  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2442  assert (message_item.mcast->header.nodeid);
2443  message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2444  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2445  sizeof (struct memb_ring_id));
2446  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2447  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2448  sort_queue_item->mcast,
2449  sort_queue_item->msg_len);
2450  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2451  }
2453  "Originated %d messages in RECOVERY.", messages_originated);
2454  goto originated;
2455 
2456 no_originate:
2458  "Did not need to originate any messages in recovery.");
2459 
2460 originated:
2461  instance->my_aru = SEQNO_START_MSG;
2462  instance->my_aru_count = 0;
2463  instance->my_seq_unchanged = 0;
2465  instance->my_install_seq = SEQNO_START_MSG;
2466  instance->last_released = SEQNO_START_MSG;
2467 
2468  reset_token_timeout (instance); // REVIEWED
2469  reset_token_retransmit_timeout (instance); // REVIEWED
2470 
2471  instance->memb_state = MEMB_STATE_RECOVERY;
2472  instance->stats.recovery_entered++;
2473  instance->stats.continuous_gather = 0;
2474 
2475  return;
2476 }
2477 
2478 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2479 {
2480  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2481 
2482  token_hold_cancel_send (instance);
2483 
2484  return;
2485 }
2486 
2488  void *srp_context,
2489  struct iovec *iovec,
2490  unsigned int iov_len,
2491  int guarantee)
2492 {
2493  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2494  int i;
2495  struct message_item message_item;
2496  char *addr;
2497  unsigned int addr_idx;
2498  struct cs_queue *queue_use;
2499 
2500  if (instance->waiting_trans_ack) {
2501  queue_use = &instance->new_message_queue_trans;
2502  } else {
2503  queue_use = &instance->new_message_queue;
2504  }
2505 
2506  if (cs_queue_is_full (queue_use)) {
2507  log_printf (instance->totemsrp_log_level_debug, "queue full");
2508  return (-1);
2509  }
2510 
2511  memset (&message_item, 0, sizeof (struct message_item));
2512 
2513  /*
2514  * Allocate pending item
2515  */
2516  message_item.mcast = totemsrp_buffer_alloc (instance);
2517  if (message_item.mcast == 0) {
2518  goto error_mcast;
2519  }
2520 
2521  /*
2522  * Set mcast header
2523  */
2524  memset(message_item.mcast, 0, sizeof (struct mcast));
2525  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2526  message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2528  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2529  assert (message_item.mcast->header.nodeid);
2530 
2531  message_item.mcast->guarantee = guarantee;
2532  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2533 
2534  addr = (char *)message_item.mcast;
2535  addr_idx = sizeof (struct mcast);
2536  for (i = 0; i < iov_len; i++) {
2537  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2538  addr_idx += iovec[i].iov_len;
2539  }
2540 
2541  message_item.msg_len = addr_idx;
2542 
2543  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2544  instance->stats.mcast_tx++;
2545  cs_queue_item_add (queue_use, &message_item);
2546 
2547  return (0);
2548 
2549 error_mcast:
2550  return (-1);
2551 }
2552 
2553 /*
2554  * Determine if there is room to queue a new message
2555  */
2556 int totemsrp_avail (void *srp_context)
2557 {
2558  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2559  int avail;
2560  struct cs_queue *queue_use;
2561 
2562  if (instance->waiting_trans_ack) {
2563  queue_use = &instance->new_message_queue_trans;
2564  } else {
2565  queue_use = &instance->new_message_queue;
2566  }
2567  cs_queue_avail (queue_use, &avail);
2568 
2569  return (avail);
2570 }
2571 
2572 /*
2573  * ORF Token Management
2574  */
2575 /*
2576  * Recast message to mcast group if it is available
2577  */
2578 static int orf_token_remcast (
2579  struct totemsrp_instance *instance,
2580  int seq)
2581 {
2583  int res;
2584  void *ptr;
2585 
2586  struct sq *sort_queue;
2587 
2588  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2589  sort_queue = &instance->recovery_sort_queue;
2590  } else {
2591  sort_queue = &instance->regular_sort_queue;
2592  }
2593 
2594  res = sq_in_range (sort_queue, seq);
2595  if (res == 0) {
2596  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2597  return (-1);
2598  }
2599 
2600  /*
2601  * Get RTR item at seq, if not available, return
2602  */
2603  res = sq_item_get (sort_queue, seq, &ptr);
2604  if (res != 0) {
2605  return -1;
2606  }
2607 
2608  sort_queue_item = ptr;
2609 
2611  instance->totemrrp_context,
2612  sort_queue_item->mcast,
2613  sort_queue_item->msg_len);
2614 
2615  return (0);
2616 }
2617 
2618 
2619 /*
2620  * Free all freeable messages from ring
2621  */
2622 static void messages_free (
2623  struct totemsrp_instance *instance,
2624  unsigned int token_aru)
2625 {
2626  struct sort_queue_item *regular_message;
2627  unsigned int i;
2628  int res;
2629  int log_release = 0;
2630  unsigned int release_to;
2631  unsigned int range = 0;
2632 
2633  release_to = token_aru;
2634  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2635  release_to = instance->my_last_aru;
2636  }
2637  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2638  release_to = instance->my_high_delivered;
2639  }
2640 
2641  /*
2642  * Ensure we dont try release before an already released point
2643  */
2644  if (sq_lt_compare (release_to, instance->last_released)) {
2645  return;
2646  }
2647 
2648  range = release_to - instance->last_released;
2649  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2650 
2651  /*
2652  * Release retransmit list items if group aru indicates they are transmitted
2653  */
2654  for (i = 1; i <= range; i++) {
2655  void *ptr;
2656 
2657  res = sq_item_get (&instance->regular_sort_queue,
2658  instance->last_released + i, &ptr);
2659  if (res == 0) {
2660  regular_message = ptr;
2661  totemsrp_buffer_release (instance, regular_message->mcast);
2662  }
2663  sq_items_release (&instance->regular_sort_queue,
2664  instance->last_released + i);
2665 
2666  log_release = 1;
2667  }
2668  instance->last_released += range;
2669 
2670  if (log_release) {
2672  "releasing messages up to and including %x", release_to);
2673  }
2674 }
2675 
2676 static void update_aru (
2677  struct totemsrp_instance *instance)
2678 {
2679  unsigned int i;
2680  int res;
2681  struct sq *sort_queue;
2682  unsigned int range;
2683  unsigned int my_aru_saved = 0;
2684 
2685  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2686  sort_queue = &instance->recovery_sort_queue;
2687  } else {
2688  sort_queue = &instance->regular_sort_queue;
2689  }
2690 
2691  range = instance->my_high_seq_received - instance->my_aru;
2692 
2693  my_aru_saved = instance->my_aru;
2694  for (i = 1; i <= range; i++) {
2695 
2696  void *ptr;
2697 
2698  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2699  /*
2700  * If hole, stop updating aru
2701  */
2702  if (res != 0) {
2703  break;
2704  }
2705  }
2706  instance->my_aru += i - 1;
2707 }
2708 
2709 /*
2710  * Multicasts pending messages onto the ring (requires orf_token possession)
2711  */
2712 static int orf_token_mcast (
2713  struct totemsrp_instance *instance,
2714  struct orf_token *token,
2715  int fcc_mcasts_allowed)
2716 {
2717  struct message_item *message_item = 0;
2718  struct cs_queue *mcast_queue;
2719  struct sq *sort_queue;
2720  struct sort_queue_item sort_queue_item;
2721  struct mcast *mcast;
2722  unsigned int fcc_mcast_current;
2723 
2724  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2725  mcast_queue = &instance->retrans_message_queue;
2726  sort_queue = &instance->recovery_sort_queue;
2727  reset_token_retransmit_timeout (instance); // REVIEWED
2728  } else {
2729  if (instance->waiting_trans_ack) {
2730  mcast_queue = &instance->new_message_queue_trans;
2731  } else {
2732  mcast_queue = &instance->new_message_queue;
2733  }
2734 
2735  sort_queue = &instance->regular_sort_queue;
2736  }
2737 
2738  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2739  if (cs_queue_is_empty (mcast_queue)) {
2740  break;
2741  }
2742  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2743 
2744  message_item->mcast->seq = ++token->seq;
2745  message_item->mcast->this_seqno = instance->global_seqno++;
2746 
2747  /*
2748  * Build IO vector
2749  */
2750  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2751  sort_queue_item.mcast = message_item->mcast;
2752  sort_queue_item.msg_len = message_item->msg_len;
2753 
2754  mcast = sort_queue_item.mcast;
2755 
2756  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2757 
2758  /*
2759  * Add message to retransmit queue
2760  */
2761  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2762 
2764  instance->totemrrp_context,
2765  message_item->mcast,
2766  message_item->msg_len);
2767 
2768  /*
2769  * Delete item from pending queue
2770  */
2771  cs_queue_item_remove (mcast_queue);
2772 
2773  /*
2774  * If messages mcasted, deliver any new messages to totempg
2775  */
2776  instance->my_high_seq_received = token->seq;
2777  }
2778 
2779  update_aru (instance);
2780 
2781  /*
2782  * Return 1 if more messages are available for single node clusters
2783  */
2784  return (fcc_mcast_current);
2785 }
2786 
2787 /*
2788  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2789  * Modify's orf_token's rtr to include retransmits required by this process
2790  */
2791 static int orf_token_rtr (
2792  struct totemsrp_instance *instance,
2793  struct orf_token *orf_token,
2794  unsigned int *fcc_allowed)
2795 {
2796  unsigned int res;
2797  unsigned int i, j;
2798  unsigned int found;
2799  struct sq *sort_queue;
2800  struct rtr_item *rtr_list;
2801  unsigned int range = 0;
2802  char retransmit_msg[1024];
2803  char value[64];
2804 
2805  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2806  sort_queue = &instance->recovery_sort_queue;
2807  } else {
2808  sort_queue = &instance->regular_sort_queue;
2809  }
2810 
2811  rtr_list = &orf_token->rtr_list[0];
2812 
2813  strcpy (retransmit_msg, "Retransmit List: ");
2814  if (orf_token->rtr_list_entries) {
2816  "Retransmit List %d", orf_token->rtr_list_entries);
2817  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2818  sprintf (value, "%x ", rtr_list[i].seq);
2819  strcat (retransmit_msg, value);
2820  }
2821  strcat (retransmit_msg, "");
2823  "%s", retransmit_msg);
2824  }
2825 
2826  /*
2827  * Retransmit messages on orf_token's RTR list from RTR queue
2828  */
2829  for (instance->fcc_remcast_current = 0, i = 0;
2830  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2831 
2832  /*
2833  * If this retransmit request isn't from this configuration,
2834  * try next rtr entry
2835  */
2836  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2837  sizeof (struct memb_ring_id)) != 0) {
2838 
2839  i += 1;
2840  continue;
2841  }
2842 
2843  res = orf_token_remcast (instance, rtr_list[i].seq);
2844  if (res == 0) {
2845  /*
2846  * Multicasted message, so no need to copy to new retransmit list
2847  */
2848  orf_token->rtr_list_entries -= 1;
2849  assert (orf_token->rtr_list_entries >= 0);
2850  memmove (&rtr_list[i], &rtr_list[i + 1],
2851  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2852 
2853  instance->stats.mcast_retx++;
2854  instance->fcc_remcast_current++;
2855  } else {
2856  i += 1;
2857  }
2858  }
2859  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2860 
2861  /*
2862  * Add messages to retransmit to RTR list
2863  * but only retry if there is room in the retransmit list
2864  */
2865 
2866  range = orf_token->seq - instance->my_aru;
2867  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2868 
2869  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2870  (i <= range); i++) {
2871 
2872  /*
2873  * Ensure message is within the sort queue range
2874  */
2875  res = sq_in_range (sort_queue, instance->my_aru + i);
2876  if (res == 0) {
2877  break;
2878  }
2879 
2880  /*
2881  * Find if a message is missing from this processor
2882  */
2883  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2884  if (res == 0) {
2885  /*
2886  * Determine how many times we have missed receiving
2887  * this sequence number. sq_item_miss_count increments
2888  * a counter for the sequence number. The miss count
2889  * will be returned and compared. This allows time for
2890  * delayed multicast messages to be received before
2891  * declaring the message is missing and requesting a
2892  * retransmit.
2893  */
2894  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2895  if (res < instance->totem_config->miss_count_const) {
2896  continue;
2897  }
2898 
2899  /*
2900  * Determine if missing message is already in retransmit list
2901  */
2902  found = 0;
2903  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2904  if (instance->my_aru + i == rtr_list[j].seq) {
2905  found = 1;
2906  }
2907  }
2908  if (found == 0) {
2909  /*
2910  * Missing message not found in current retransmit list so add it
2911  */
2912  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2913  &instance->my_ring_id, sizeof (struct memb_ring_id));
2914  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2915  orf_token->rtr_list_entries++;
2916  }
2917  }
2918  }
2919  return (instance->fcc_remcast_current);
2920 }
2921 
2922 static void token_retransmit (struct totemsrp_instance *instance)
2923 {
2925  instance->orf_token_retransmit,
2926  instance->orf_token_retransmit_size);
2927 }
2928 
2929 /*
2930  * Retransmit the regular token if no mcast or token has
2931  * been received in retransmit token period retransmit
2932  * the token to the next processor
2933  */
2934 static void timer_function_token_retransmit_timeout (void *data)
2935 {
2936  struct totemsrp_instance *instance = data;
2937 
2938  switch (instance->memb_state) {
2939  case MEMB_STATE_GATHER:
2940  break;
2941  case MEMB_STATE_COMMIT:
2943  case MEMB_STATE_RECOVERY:
2944  token_retransmit (instance);
2945  reset_token_retransmit_timeout (instance); // REVIEWED
2946  break;
2947  }
2948 }
2949 
2950 static void timer_function_token_hold_retransmit_timeout (void *data)
2951 {
2952  struct totemsrp_instance *instance = data;
2953 
2954  switch (instance->memb_state) {
2955  case MEMB_STATE_GATHER:
2956  break;
2957  case MEMB_STATE_COMMIT:
2958  break;
2960  case MEMB_STATE_RECOVERY:
2961  token_retransmit (instance);
2962  break;
2963  }
2964 }
2965 
2966 static void timer_function_merge_detect_timeout(void *data)
2967 {
2968  struct totemsrp_instance *instance = data;
2969 
2971 
2972  switch (instance->memb_state) {
2974  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
2975  memb_merge_detect_transmit (instance);
2976  }
2977  break;
2978  case MEMB_STATE_GATHER:
2979  case MEMB_STATE_COMMIT:
2980  case MEMB_STATE_RECOVERY:
2981  break;
2982  }
2983 }
2984 
2985 /*
2986  * Send orf_token to next member (requires orf_token)
2987  */
2988 static int token_send (
2989  struct totemsrp_instance *instance,
2990  struct orf_token *orf_token,
2991  int forward_token)
2992 {
2993  int res = 0;
2994  unsigned int orf_token_size;
2995 
2996  orf_token_size = sizeof (struct orf_token) +
2997  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2998 
2999  orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
3000  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3001  instance->orf_token_retransmit_size = orf_token_size;
3002  assert (orf_token->header.nodeid);
3003 
3004  if (forward_token == 0) {
3005  return (0);
3006  }
3007 
3009  orf_token,
3010  orf_token_size);
3011 
3012  return (res);
3013 }
3014 
3015 static int token_hold_cancel_send (struct totemsrp_instance *instance)
3016 {
3017  struct token_hold_cancel token_hold_cancel;
3018 
3019  /*
3020  * Only cancel if the token is currently held
3021  */
3022  if (instance->my_token_held == 0) {
3023  return (0);
3024  }
3025  instance->my_token_held = 0;
3026 
3027  /*
3028  * Build message
3029  */
3030  token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
3031  token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
3032  token_hold_cancel.header.encapsulated = 0;
3033  token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
3034  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3035  sizeof (struct memb_ring_id));
3036  assert (token_hold_cancel.header.nodeid);
3037 
3038  instance->stats.token_hold_cancel_tx++;
3039 
3040  totemrrp_mcast_flush_send (instance->totemrrp_context, &token_hold_cancel,
3041  sizeof (struct token_hold_cancel));
3042 
3043  return (0);
3044 }
3045 
3046 static int orf_token_send_initial (struct totemsrp_instance *instance)
3047 {
3048  struct orf_token orf_token;
3049  int res;
3050 
3051  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3052  orf_token.header.endian_detector = ENDIAN_LOCAL;
3053  orf_token.header.encapsulated = 0;
3054  orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
3055  assert (orf_token.header.nodeid);
3056  orf_token.seq = SEQNO_START_MSG;
3057  orf_token.token_seq = SEQNO_START_TOKEN;
3058  orf_token.retrans_flg = 1;
3059  instance->my_set_retrans_flg = 1;
3060  instance->stats.orf_token_tx++;
3061 
3062  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3063  orf_token.retrans_flg = 0;
3064  instance->my_set_retrans_flg = 0;
3065  } else {
3066  orf_token.retrans_flg = 1;
3067  instance->my_set_retrans_flg = 1;
3068  }
3069 
3070  orf_token.aru = 0;
3071  orf_token.aru = SEQNO_START_MSG - 1;
3072  orf_token.aru_addr = instance->my_id.addr[0].nodeid;
3073 
3074  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3075  orf_token.fcc = 0;
3076  orf_token.backlog = 0;
3077 
3078  orf_token.rtr_list_entries = 0;
3079 
3080  res = token_send (instance, &orf_token, 1);
3081 
3082  return (res);
3083 }
3084 
3085 static void memb_state_commit_token_update (
3086  struct totemsrp_instance *instance)
3087 {
3088  struct srp_addr *addr;
3089  struct memb_commit_token_memb_entry *memb_list;
3090  unsigned int high_aru;
3091  unsigned int i;
3092 
3093  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3094  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3095 
3096  memcpy (instance->my_new_memb_list, addr,
3097  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3098 
3099  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3100 
3101  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3102  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3103 
3104  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3105  /*
3106  * TODO high delivered is really instance->my_aru, but with safe this
3107  * could change?
3108  */
3109  instance->my_received_flg =
3110  (instance->my_aru == instance->my_high_seq_received);
3111 
3112  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3113 
3114  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3115  /*
3116  * find high aru up to current memb_index for all matching ring ids
3117  * if any ring id matching memb_index has aru less then high aru set
3118  * received flag for that entry to false
3119  */
3120  high_aru = memb_list[instance->commit_token->memb_index].aru;
3121  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3122  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3123  &memb_list[i].ring_id,
3124  sizeof (struct memb_ring_id)) == 0) {
3125 
3126  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3127  high_aru = memb_list[i].aru;
3128  }
3129  }
3130  }
3131 
3132  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3133  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3134  &memb_list[i].ring_id,
3135  sizeof (struct memb_ring_id)) == 0) {
3136 
3137  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3138  memb_list[i].received_flg = 0;
3139  if (i == instance->commit_token->memb_index) {
3140  instance->my_received_flg = 0;
3141  }
3142  }
3143  }
3144  }
3145 
3146  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3147  instance->commit_token->memb_index += 1;
3148  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3149  assert (instance->commit_token->header.nodeid);
3150 }
3151 
3152 static void memb_state_commit_token_target_set (
3153  struct totemsrp_instance *instance)
3154 {
3155  struct srp_addr *addr;
3156  unsigned int i;
3157 
3158  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3159 
3160  for (i = 0; i < instance->totem_config->interface_count; i++) {
3162  instance->totemrrp_context,
3163  &addr[instance->commit_token->memb_index %
3164  instance->commit_token->addr_entries].addr[i],
3165  i);
3166  }
3167 }
3168 
3169 static int memb_state_commit_token_send_recovery (
3170  struct totemsrp_instance *instance,
3171  struct memb_commit_token *commit_token)
3172 {
3173  unsigned int commit_token_size;
3174 
3175  commit_token->token_seq++;
3176  commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3177  commit_token_size = sizeof (struct memb_commit_token) +
3178  ((sizeof (struct srp_addr) +
3179  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3180  /*
3181  * Make a copy for retransmission if necessary
3182  */
3183  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3184  instance->orf_token_retransmit_size = commit_token_size;
3185 
3186  instance->stats.memb_commit_token_tx++;
3187 
3189  commit_token,
3190  commit_token_size);
3191 
3192  /*
3193  * Request retransmission of the commit token in case it is lost
3194  */
3195  reset_token_retransmit_timeout (instance);
3196  return (0);
3197 }
3198 
3199 static int memb_state_commit_token_send (
3200  struct totemsrp_instance *instance)
3201 {
3202  unsigned int commit_token_size;
3203 
3204  instance->commit_token->token_seq++;
3205  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3206  commit_token_size = sizeof (struct memb_commit_token) +
3207  ((sizeof (struct srp_addr) +
3208  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3209  /*
3210  * Make a copy for retransmission if necessary
3211  */
3212  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3213  instance->orf_token_retransmit_size = commit_token_size;
3214 
3215  instance->stats.memb_commit_token_tx++;
3216 
3218  instance->commit_token,
3219  commit_token_size);
3220 
3221  /*
3222  * Request retransmission of the commit token in case it is lost
3223  */
3224  reset_token_retransmit_timeout (instance);
3225  return (0);
3226 }
3227 
3228 
3229 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3230 {
3231  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3232  int token_memb_entries = 0;
3233  int i;
3234  struct totem_ip_address *lowest_addr;
3235 
3236  memb_set_subtract (token_memb, &token_memb_entries,
3237  instance->my_proc_list, instance->my_proc_list_entries,
3238  instance->my_failed_list, instance->my_failed_list_entries);
3239 
3240  /*
3241  * find representative by searching for smallest identifier
3242  */
3243 
3244  lowest_addr = &token_memb[0].addr[0];
3245  for (i = 1; i < token_memb_entries; i++) {
3246  if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
3247  totemip_copy (lowest_addr, &token_memb[i].addr[0]);
3248  }
3249  }
3250  return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
3251 }
3252 
3253 static int srp_addr_compare (const void *a, const void *b)
3254 {
3255  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3256  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3257 
3258  return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
3259 }
3260 
3261 static void memb_state_commit_token_create (
3262  struct totemsrp_instance *instance)
3263 {
3264  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3265  struct srp_addr *addr;
3266  struct memb_commit_token_memb_entry *memb_list;
3267  int token_memb_entries = 0;
3268 
3270  "Creating commit token because I am the rep.");
3271 
3272  memb_set_subtract (token_memb, &token_memb_entries,
3273  instance->my_proc_list, instance->my_proc_list_entries,
3274  instance->my_failed_list, instance->my_failed_list_entries);
3275 
3276  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3279  instance->commit_token->header.encapsulated = 0;
3280  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3281  assert (instance->commit_token->header.nodeid);
3282 
3283  totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
3284 
3285  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3286 
3287  /*
3288  * This qsort is necessary to ensure the commit token traverses
3289  * the ring in the proper order
3290  */
3291  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3292  srp_addr_compare);
3293 
3294  instance->commit_token->memb_index = 0;
3295  instance->commit_token->addr_entries = token_memb_entries;
3296 
3297  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3298  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3299 
3300  memcpy (addr, token_memb,
3301  token_memb_entries * sizeof (struct srp_addr));
3302  memset (memb_list, 0,
3303  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3304 }
3305 
3306 static void memb_join_message_send (struct totemsrp_instance *instance)
3307 {
3308  char memb_join_data[40000];
3309  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3310  char *addr;
3311  unsigned int addr_idx;
3312 
3313  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3314  memb_join->header.endian_detector = ENDIAN_LOCAL;
3315  memb_join->header.encapsulated = 0;
3316  memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
3317  assert (memb_join->header.nodeid);
3318 
3319  memb_join->ring_seq = instance->my_ring_id.seq;
3320  memb_join->proc_list_entries = instance->my_proc_list_entries;
3321  memb_join->failed_list_entries = instance->my_failed_list_entries;
3322  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3323 
3324  /*
3325  * This mess adds the joined and failed processor lists into the join
3326  * message
3327  */
3328  addr = (char *)memb_join;
3329  addr_idx = sizeof (struct memb_join);
3330  memcpy (&addr[addr_idx],
3331  instance->my_proc_list,
3332  instance->my_proc_list_entries *
3333  sizeof (struct srp_addr));
3334  addr_idx +=
3335  instance->my_proc_list_entries *
3336  sizeof (struct srp_addr);
3337  memcpy (&addr[addr_idx],
3338  instance->my_failed_list,
3339  instance->my_failed_list_entries *
3340  sizeof (struct srp_addr));
3341  addr_idx +=
3342  instance->my_failed_list_entries *
3343  sizeof (struct srp_addr);
3344 
3345 
3346  if (instance->totem_config->send_join_timeout) {
3347  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3348  }
3349 
3350  instance->stats.memb_join_tx++;
3351 
3353  instance->totemrrp_context,
3354  memb_join,
3355  addr_idx);
3356 }
3357 
3358 static void memb_leave_message_send (struct totemsrp_instance *instance)
3359 {
3360  char memb_join_data[40000];
3361  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3362  char *addr;
3363  unsigned int addr_idx;
3364  int active_memb_entries;
3365  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3366 
3368  "sending join/leave message");
3369 
3370  /*
3371  * add us to the failed list, and remove us from
3372  * the members list
3373  */
3374  memb_set_merge(
3375  &instance->my_id, 1,
3376  instance->my_failed_list, &instance->my_failed_list_entries);
3377 
3378  memb_set_subtract (active_memb, &active_memb_entries,
3379  instance->my_proc_list, instance->my_proc_list_entries,
3380  &instance->my_id, 1);
3381 
3382 
3383  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3384  memb_join->header.endian_detector = ENDIAN_LOCAL;
3385  memb_join->header.encapsulated = 0;
3386  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3387 
3388  memb_join->ring_seq = instance->my_ring_id.seq;
3389  memb_join->proc_list_entries = active_memb_entries;
3390  memb_join->failed_list_entries = instance->my_failed_list_entries;
3391  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3392  memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
3393 
3394  // TODO: CC Maybe use the actual join send routine.
3395  /*
3396  * This mess adds the joined and failed processor lists into the join
3397  * message
3398  */
3399  addr = (char *)memb_join;
3400  addr_idx = sizeof (struct memb_join);
3401  memcpy (&addr[addr_idx],
3402  active_memb,
3403  active_memb_entries *
3404  sizeof (struct srp_addr));
3405  addr_idx +=
3406  active_memb_entries *
3407  sizeof (struct srp_addr);
3408  memcpy (&addr[addr_idx],
3409  instance->my_failed_list,
3410  instance->my_failed_list_entries *
3411  sizeof (struct srp_addr));
3412  addr_idx +=
3413  instance->my_failed_list_entries *
3414  sizeof (struct srp_addr);
3415 
3416 
3417  if (instance->totem_config->send_join_timeout) {
3418  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3419  }
3420  instance->stats.memb_join_tx++;
3421 
3423  instance->totemrrp_context,
3424  memb_join,
3425  addr_idx);
3426 }
3427 
3428 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3429 {
3430  struct memb_merge_detect memb_merge_detect;
3431 
3432  memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
3433  memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
3434  memb_merge_detect.header.encapsulated = 0;
3435  memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
3436  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3437  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3438  sizeof (struct memb_ring_id));
3439  assert (memb_merge_detect.header.nodeid);
3440 
3441  instance->stats.memb_merge_detect_tx++;
3443  &memb_merge_detect,
3444  sizeof (struct memb_merge_detect));
3445 }
3446 
3447 static void memb_ring_id_set (
3448  struct totemsrp_instance *instance,
3449  const struct memb_ring_id *ring_id)
3450 {
3451 
3452  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3453 }
3454 
3456  void *srp_context,
3457  void **handle_out,
3458  enum totem_callback_token_type type,
3459  int delete,
3460  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3461  const void *data)
3462 {
3463  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3464  struct token_callback_instance *callback_handle;
3465 
3466  token_hold_cancel_send (instance);
3467 
3468  callback_handle = malloc (sizeof (struct token_callback_instance));
3469  if (callback_handle == 0) {
3470  return (-1);
3471  }
3472  *handle_out = (void *)callback_handle;
3473  list_init (&callback_handle->list);
3474  callback_handle->callback_fn = callback_fn;
3475  callback_handle->data = (void *) data;
3476  callback_handle->callback_type = type;
3477  callback_handle->delete = delete;
3478  switch (type) {
3480  list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3481  break;
3483  list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3484  break;
3485  }
3486 
3487  return (0);
3488 }
3489 
3490 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3491 {
3492  struct token_callback_instance *h;
3493 
3494  if (*handle_out) {
3495  h = (struct token_callback_instance *)*handle_out;
3496  list_del (&h->list);
3497  free (h);
3498  h = NULL;
3499  *handle_out = 0;
3500  }
3501 }
3502 
3503 static void token_callbacks_execute (
3504  struct totemsrp_instance *instance,
3505  enum totem_callback_token_type type)
3506 {
3507  struct list_head *list;
3508  struct list_head *list_next;
3509  struct list_head *callback_listhead = 0;
3511  int res;
3512  int del;
3513 
3514  switch (type) {
3516  callback_listhead = &instance->token_callback_received_listhead;
3517  break;
3519  callback_listhead = &instance->token_callback_sent_listhead;
3520  break;
3521  default:
3522  assert (0);
3523  }
3524 
3525  for (list = callback_listhead->next; list != callback_listhead;
3526  list = list_next) {
3527 
3528  token_callback_instance = list_entry (list, struct token_callback_instance, list);
3529 
3530  list_next = list->next;
3531  del = token_callback_instance->delete;
3532  if (del == 1) {
3533  list_del (list);
3534  }
3535 
3536  res = token_callback_instance->callback_fn (
3537  token_callback_instance->callback_type,
3538  token_callback_instance->data);
3539  /*
3540  * This callback failed to execute, try it again on the next token
3541  */
3542  if (res == -1 && del == 1) {
3543  list_add (list, callback_listhead);
3544  } else if (del) {
3545  free (token_callback_instance);
3546  }
3547  }
3548 }
3549 
3550 /*
3551  * Flow control functions
3552  */
3553 static unsigned int backlog_get (struct totemsrp_instance *instance)
3554 {
3555  unsigned int backlog = 0;
3556  struct cs_queue *queue_use = NULL;
3557 
3558  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3559  if (instance->waiting_trans_ack) {
3560  queue_use = &instance->new_message_queue_trans;
3561  } else {
3562  queue_use = &instance->new_message_queue;
3563  }
3564  } else
3565  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3566  queue_use = &instance->retrans_message_queue;
3567  }
3568 
3569  if (queue_use != NULL) {
3570  backlog = cs_queue_used (queue_use);
3571  }
3572 
3573  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3574  return (backlog);
3575 }
3576 
3577 static int fcc_calculate (
3578  struct totemsrp_instance *instance,
3579  struct orf_token *token)
3580 {
3581  unsigned int transmits_allowed;
3582  unsigned int backlog_calc;
3583 
3584  transmits_allowed = instance->totem_config->max_messages;
3585 
3586  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3587  transmits_allowed = instance->totem_config->window_size - token->fcc;
3588  }
3589 
3590  instance->my_cbl = backlog_get (instance);
3591 
3592  /*
3593  * Only do backlog calculation if there is a backlog otherwise
3594  * we would result in div by zero
3595  */
3596  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3597  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3598  (token->backlog + instance->my_cbl - instance->my_pbl);
3599  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3600  transmits_allowed = backlog_calc;
3601  }
3602  }
3603 
3604  return (transmits_allowed);
3605 }
3606 
3607 /*
3608  * don't overflow the RTR sort queue
3609  */
3610 static void fcc_rtr_limit (
3611  struct totemsrp_instance *instance,
3612  struct orf_token *token,
3613  unsigned int *transmits_allowed)
3614 {
3615  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3616  check -= (*transmits_allowed + instance->totem_config->window_size);
3617  assert (check >= 0);
3618  if (sq_lt_compare (instance->last_released +
3619  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3620  instance->totem_config->window_size,
3621 
3622  token->seq)) {
3623 
3624  *transmits_allowed = 0;
3625  }
3626 }
3627 
3628 static void fcc_token_update (
3629  struct totemsrp_instance *instance,
3630  struct orf_token *token,
3631  unsigned int msgs_transmitted)
3632 {
3633  token->fcc += msgs_transmitted - instance->my_trc;
3634  token->backlog += instance->my_cbl - instance->my_pbl;
3635  instance->my_trc = msgs_transmitted;
3636  instance->my_pbl = instance->my_cbl;
3637 }
3638 
3639 /*
3640  * Message Handlers
3641  */
3642 
3643 unsigned long long int tv_old;
3644 /*
3645  * message handler called when TOKEN message type received
3646  */
3647 static int message_handler_orf_token (
3648  struct totemsrp_instance *instance,
3649  const void *msg,
3650  size_t msg_len,
3651  int endian_conversion_needed)
3652 {
3653  char token_storage[1500];
3654  char token_convert[1500];
3655  struct orf_token *token = NULL;
3656  int forward_token;
3657  unsigned int transmits_allowed;
3658  unsigned int mcasted_retransmit;
3659  unsigned int mcasted_regular;
3660  unsigned int last_aru;
3661 
3662 #ifdef GIVEINFO
3663  unsigned long long tv_current;
3664  unsigned long long tv_diff;
3665 
3666  tv_current = qb_util_nano_current_get ();
3667  tv_diff = tv_current - tv_old;
3668  tv_old = tv_current;
3669 
3671  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3672 #endif
3673 
3674  if (instance->orf_token_discard) {
3675  return (0);
3676  }
3677 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3678  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3679  return (0);
3680  }
3681 #endif
3682 
3683  if (endian_conversion_needed) {
3684  orf_token_endian_convert ((struct orf_token *)msg,
3685  (struct orf_token *)token_convert);
3686  msg = (struct orf_token *)token_convert;
3687  }
3688 
3689  /*
3690  * Make copy of token and retransmit list in case we have
3691  * to flush incoming messages from the kernel queue
3692  */
3693  token = (struct orf_token *)token_storage;
3694  memcpy (token, msg, sizeof (struct orf_token));
3695  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3696  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3697 
3698 
3699  /*
3700  * Handle merge detection timeout
3701  */
3702  if (token->seq == instance->my_last_seq) {
3703  start_merge_detect_timeout (instance);
3704  instance->my_seq_unchanged += 1;
3705  } else {
3706  cancel_merge_detect_timeout (instance);
3707  cancel_token_hold_retransmit_timeout (instance);
3708  instance->my_seq_unchanged = 0;
3709  }
3710 
3711  instance->my_last_seq = token->seq;
3712 
3713 #ifdef TEST_RECOVERY_MSG_COUNT
3714  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3715  return (0);
3716  }
3717 #endif
3718  instance->flushing = 1;
3720  instance->flushing = 0;
3721 
3722  /*
3723  * Determine if we should hold (in reality drop) the token
3724  */
3725  instance->my_token_held = 0;
3726  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
3727  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3728  instance->my_token_held = 1;
3729  } else
3730  if (!totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
3731  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3732  instance->my_token_held = 1;
3733  }
3734 
3735  /*
3736  * Hold onto token when there is no activity on ring and
3737  * this processor is the ring rep
3738  */
3739  forward_token = 1;
3740  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
3741  if (instance->my_token_held) {
3742  forward_token = 0;
3743  }
3744  }
3745 
3746  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3747 
3748  switch (instance->memb_state) {
3749  case MEMB_STATE_COMMIT:
3750  /* Discard token */
3751  break;
3752 
3754  messages_free (instance, token->aru);
3755  /*
3756  * Do NOT add break, this case should also execute code in gather case.
3757  */
3758 
3759  case MEMB_STATE_GATHER:
3760  /*
3761  * DO NOT add break, we use different free mechanism in recovery state
3762  */
3763 
3764  case MEMB_STATE_RECOVERY:
3765  /*
3766  * Discard tokens from another configuration
3767  */
3768  if (memcmp (&token->ring_id, &instance->my_ring_id,
3769  sizeof (struct memb_ring_id)) != 0) {
3770 
3771  if ((forward_token)
3772  && instance->use_heartbeat) {
3773  reset_heartbeat_timeout(instance);
3774  }
3775  else {
3776  cancel_heartbeat_timeout(instance);
3777  }
3778 
3779  return (0); /* discard token */
3780  }
3781 
3782  /*
3783  * Discard retransmitted tokens
3784  */
3785  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3786  return (0); /* discard token */
3787  }
3788  last_aru = instance->my_last_aru;
3789  instance->my_last_aru = token->aru;
3790 
3791  transmits_allowed = fcc_calculate (instance, token);
3792  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3793 
3794  if (instance->my_token_held == 1 &&
3795  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3796  instance->my_token_held = 0;
3797  forward_token = 1;
3798  }
3799 
3800  fcc_rtr_limit (instance, token, &transmits_allowed);
3801  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3802 /*
3803 if (mcasted_regular) {
3804 printf ("mcasted regular %d\n", mcasted_regular);
3805 printf ("token seq %d\n", token->seq);
3806 }
3807 */
3808  fcc_token_update (instance, token, mcasted_retransmit +
3809  mcasted_regular);
3810 
3811  if (sq_lt_compare (instance->my_aru, token->aru) ||
3812  instance->my_id.addr[0].nodeid == token->aru_addr ||
3813  token->aru_addr == 0) {
3814 
3815  token->aru = instance->my_aru;
3816  if (token->aru == token->seq) {
3817  token->aru_addr = 0;
3818  } else {
3819  token->aru_addr = instance->my_id.addr[0].nodeid;
3820  }
3821  }
3822  if (token->aru == last_aru && token->aru_addr != 0) {
3823  instance->my_aru_count += 1;
3824  } else {
3825  instance->my_aru_count = 0;
3826  }
3827 
3828  /*
3829  * We really don't follow specification there. In specification, OTHER nodes
3830  * detect failure of one node (based on aru_count) and my_id IS NEVER added
3831  * to failed list (so node never mark itself as failed)
3832  */
3833  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3834  token->aru_addr == instance->my_id.addr[0].nodeid) {
3835 
3837  "FAILED TO RECEIVE");
3838 
3839  instance->failed_to_recv = 1;
3840 
3841  memb_set_merge (&instance->my_id, 1,
3842  instance->my_failed_list,
3843  &instance->my_failed_list_entries);
3844 
3845  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
3846  } else {
3847  instance->my_token_seq = token->token_seq;
3848  token->token_seq += 1;
3849 
3850  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3851  /*
3852  * instance->my_aru == instance->my_high_seq_received means this processor
3853  * has recovered all messages it can recover
3854  * (ie: its retrans queue is empty)
3855  */
3856  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
3857 
3858  if (token->retrans_flg == 0) {
3859  token->retrans_flg = 1;
3860  instance->my_set_retrans_flg = 1;
3861  }
3862  } else
3863  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
3864  token->retrans_flg = 0;
3865  instance->my_set_retrans_flg = 0;
3866  }
3868  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
3869  token->retrans_flg, instance->my_set_retrans_flg,
3870  cs_queue_is_empty (&instance->retrans_message_queue),
3871  instance->my_retrans_flg_count, token->aru);
3872  if (token->retrans_flg == 0) {
3873  instance->my_retrans_flg_count += 1;
3874  } else {
3875  instance->my_retrans_flg_count = 0;
3876  }
3877  if (instance->my_retrans_flg_count == 2) {
3878  instance->my_install_seq = token->seq;
3879  }
3881  "install seq %x aru %x high seq received %x",
3882  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
3883  if (instance->my_retrans_flg_count >= 2 &&
3884  instance->my_received_flg == 0 &&
3885  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
3886  instance->my_received_flg = 1;
3887  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
3888  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
3889  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
3890  }
3891  if (instance->my_retrans_flg_count >= 3 &&
3892  sq_lte_compare (instance->my_install_seq, token->aru)) {
3893  instance->my_rotation_counter += 1;
3894  } else {
3895  instance->my_rotation_counter = 0;
3896  }
3897  if (instance->my_rotation_counter == 2) {
3899  "retrans flag count %x token aru %x install seq %x aru %x %x",
3900  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
3901  instance->my_aru, token->seq);
3902 
3903  memb_state_operational_enter (instance);
3904  instance->my_rotation_counter = 0;
3905  instance->my_retrans_flg_count = 0;
3906  }
3907  }
3908 
3910  token_send (instance, token, forward_token);
3911 
3912 #ifdef GIVEINFO
3913  tv_current = qb_util_nano_current_get ();
3914  tv_diff = tv_current - tv_old;
3915  tv_old = tv_current;
3917  "I held %0.4f ms",
3918  ((float)tv_diff) / 1000000.0);
3919 #endif
3920  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3921  messages_deliver_to_app (instance, 0,
3922  instance->my_high_seq_received);
3923  }
3924 
3925  /*
3926  * Deliver messages after token has been transmitted
3927  * to improve performance
3928  */
3929  reset_token_timeout (instance); // REVIEWED
3930  reset_token_retransmit_timeout (instance); // REVIEWED
3931  if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
3932  instance->my_token_held == 1) {
3933 
3934  start_token_hold_retransmit_timeout (instance);
3935  }
3936 
3937  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
3938  }
3939  break;
3940  }
3941 
3942  if ((forward_token)
3943  && instance->use_heartbeat) {
3944  reset_heartbeat_timeout(instance);
3945  }
3946  else {
3947  cancel_heartbeat_timeout(instance);
3948  }
3949 
3950  return (0);
3951 }
3952 
3953 static void messages_deliver_to_app (
3954  struct totemsrp_instance *instance,
3955  int skip,
3956  unsigned int end_point)
3957 {
3958  struct sort_queue_item *sort_queue_item_p;
3959  unsigned int i;
3960  int res;
3961  struct mcast *mcast_in;
3962  struct mcast mcast_header;
3963  unsigned int range = 0;
3964  int endian_conversion_required;
3965  unsigned int my_high_delivered_stored = 0;
3966 
3967 
3968  range = end_point - instance->my_high_delivered;
3969 
3970  if (range) {
3972  "Delivering %x to %x", instance->my_high_delivered,
3973  end_point);
3974  }
3975  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
3976  my_high_delivered_stored = instance->my_high_delivered;
3977 
3978  /*
3979  * Deliver messages in order from rtr queue to pending delivery queue
3980  */
3981  for (i = 1; i <= range; i++) {
3982 
3983  void *ptr = 0;
3984 
3985  /*
3986  * If out of range of sort queue, stop assembly
3987  */
3988  res = sq_in_range (&instance->regular_sort_queue,
3989  my_high_delivered_stored + i);
3990  if (res == 0) {
3991  break;
3992  }
3993 
3994  res = sq_item_get (&instance->regular_sort_queue,
3995  my_high_delivered_stored + i, &ptr);
3996  /*
3997  * If hole, stop assembly
3998  */
3999  if (res != 0 && skip == 0) {
4000  break;
4001  }
4002 
4003  instance->my_high_delivered = my_high_delivered_stored + i;
4004 
4005  if (res != 0) {
4006  continue;
4007 
4008  }
4009 
4010  sort_queue_item_p = ptr;
4011 
4012  mcast_in = sort_queue_item_p->mcast;
4013  assert (mcast_in != (struct mcast *)0xdeadbeef);
4014 
4015  endian_conversion_required = 0;
4016  if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
4017  endian_conversion_required = 1;
4018  mcast_endian_convert (mcast_in, &mcast_header);
4019  } else {
4020  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4021  }
4022 
4023  /*
4024  * Skip messages not originated in instance->my_deliver_memb
4025  */
4026  if (skip &&
4027  memb_set_subset (&mcast_header.system_from,
4028  1,
4029  instance->my_deliver_memb_list,
4030  instance->my_deliver_memb_entries) == 0) {
4031 
4032  instance->my_high_delivered = my_high_delivered_stored + i;
4033 
4034  continue;
4035  }
4036 
4037  /*
4038  * Message found
4039  */
4041  "Delivering MCAST message with seq %x to pending delivery queue",
4042  mcast_header.seq);
4043 
4044  /*
4045  * Message is locally originated multicast
4046  */
4047  instance->totemsrp_deliver_fn (
4048  mcast_header.header.nodeid,
4049  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4050  sort_queue_item_p->msg_len - sizeof (struct mcast),
4051  endian_conversion_required);
4052  }
4053 }
4054 
4055 /*
4056  * recv message handler called when MCAST message type received
4057  */
4058 static int message_handler_mcast (
4059  struct totemsrp_instance *instance,
4060  const void *msg,
4061  size_t msg_len,
4062  int endian_conversion_needed)
4063 {
4064  struct sort_queue_item sort_queue_item;
4065  struct sq *sort_queue;
4066  struct mcast mcast_header;
4067 
4068 
4069  if (endian_conversion_needed) {
4070  mcast_endian_convert (msg, &mcast_header);
4071  } else {
4072  memcpy (&mcast_header, msg, sizeof (struct mcast));
4073  }
4074 
4075  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4076  sort_queue = &instance->recovery_sort_queue;
4077  } else {
4078  sort_queue = &instance->regular_sort_queue;
4079  }
4080 
4081  assert (msg_len <= FRAME_SIZE_MAX);
4082 
4083 #ifdef TEST_DROP_MCAST_PERCENTAGE
4084  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4085  return (0);
4086  }
4087 #endif
4088 
4089  /*
4090  * If the message is foreign execute the switch below
4091  */
4092  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4093  sizeof (struct memb_ring_id)) != 0) {
4094 
4095  switch (instance->memb_state) {
4097  memb_set_merge (
4098  &mcast_header.system_from, 1,
4099  instance->my_proc_list, &instance->my_proc_list_entries);
4100  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4101  break;
4102 
4103  case MEMB_STATE_GATHER:
4104  if (!memb_set_subset (
4105  &mcast_header.system_from,
4106  1,
4107  instance->my_proc_list,
4108  instance->my_proc_list_entries)) {
4109 
4110  memb_set_merge (&mcast_header.system_from, 1,
4111  instance->my_proc_list, &instance->my_proc_list_entries);
4112  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4113  return (0);
4114  }
4115  break;
4116 
4117  case MEMB_STATE_COMMIT:
4118  /* discard message */
4119  instance->stats.rx_msg_dropped++;
4120  break;
4121 
4122  case MEMB_STATE_RECOVERY:
4123  /* discard message */
4124  instance->stats.rx_msg_dropped++;
4125  break;
4126  }
4127  return (0);
4128  }
4129 
4131  "Received ringid(%s:%lld) seq %x",
4132  totemip_print (&mcast_header.ring_id.rep),
4133  mcast_header.ring_id.seq,
4134  mcast_header.seq);
4135 
4136  /*
4137  * Add mcast message to rtr queue if not already in rtr queue
4138  * otherwise free io vectors
4139  */
4140  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4141  sq_in_range (sort_queue, mcast_header.seq) &&
4142  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4143 
4144  /*
4145  * Allocate new multicast memory block
4146  */
4147 // TODO LEAK
4148  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4149  if (sort_queue_item.mcast == NULL) {
4150  return (-1); /* error here is corrected by the algorithm */
4151  }
4152  memcpy (sort_queue_item.mcast, msg, msg_len);
4153  sort_queue_item.msg_len = msg_len;
4154 
4155  if (sq_lt_compare (instance->my_high_seq_received,
4156  mcast_header.seq)) {
4157  instance->my_high_seq_received = mcast_header.seq;
4158  }
4159 
4160  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4161  }
4162 
4163  update_aru (instance);
4164  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4165  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4166  }
4167 
4168 /* TODO remove from retrans message queue for old ring in recovery state */
4169  return (0);
4170 }
4171 
4172 static int message_handler_memb_merge_detect (
4173  struct totemsrp_instance *instance,
4174  const void *msg,
4175  size_t msg_len,
4176  int endian_conversion_needed)
4177 {
4178  struct memb_merge_detect memb_merge_detect;
4179 
4180 
4181  if (endian_conversion_needed) {
4182  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4183  } else {
4184  memcpy (&memb_merge_detect, msg,
4185  sizeof (struct memb_merge_detect));
4186  }
4187 
4188  /*
4189  * do nothing if this is a merge detect from this configuration
4190  */
4191  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4192  sizeof (struct memb_ring_id)) == 0) {
4193 
4194  return (0);
4195  }
4196 
4197  /*
4198  * Execute merge operation
4199  */
4200  switch (instance->memb_state) {
4202  memb_set_merge (&memb_merge_detect.system_from, 1,
4203  instance->my_proc_list, &instance->my_proc_list_entries);
4204  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4205  break;
4206 
4207  case MEMB_STATE_GATHER:
4208  if (!memb_set_subset (
4209  &memb_merge_detect.system_from,
4210  1,
4211  instance->my_proc_list,
4212  instance->my_proc_list_entries)) {
4213 
4214  memb_set_merge (&memb_merge_detect.system_from, 1,
4215  instance->my_proc_list, &instance->my_proc_list_entries);
4216  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4217  return (0);
4218  }
4219  break;
4220 
4221  case MEMB_STATE_COMMIT:
4222  /* do nothing in commit */
4223  break;
4224 
4225  case MEMB_STATE_RECOVERY:
4226  /* do nothing in recovery */
4227  break;
4228  }
4229  return (0);
4230 }
4231 
4232 static void memb_join_process (
4233  struct totemsrp_instance *instance,
4234  const struct memb_join *memb_join)
4235 {
4236  struct srp_addr *proc_list;
4237  struct srp_addr *failed_list;
4238  int gather_entered = 0;
4239  int fail_minus_memb_entries = 0;
4240  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4241 
4242  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4243  failed_list = proc_list + memb_join->proc_list_entries;
4244 
4245 /*
4246  memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
4247  memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
4248  memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4249  memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4250 -*/
4251 
4252  if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4253  if (instance->flushing) {
4254  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4256  "Discarding LEAVE message during flush, nodeid=%u",
4257  memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4258  if (memb_join->failed_list_entries > 0) {
4259  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4260  }
4261  } else {
4263  "Discarding JOIN message during flush, nodeid=%d", memb_join->header.nodeid);
4264  }
4265  return;
4266  } else {
4267  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4269  "Received LEAVE message from %u", memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4270  if (memb_join->failed_list_entries > 0) {
4271  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4272  }
4273  }
4274  }
4275 
4276  }
4277 
4278  if (memb_set_equal (proc_list,
4279  memb_join->proc_list_entries,
4280  instance->my_proc_list,
4281  instance->my_proc_list_entries) &&
4282 
4283  memb_set_equal (failed_list,
4284  memb_join->failed_list_entries,
4285  instance->my_failed_list,
4286  instance->my_failed_list_entries)) {
4287 
4288  memb_consensus_set (instance, &memb_join->system_from);
4289 
4290  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4291  instance->failed_to_recv = 0;
4292  srp_addr_copy (&instance->my_proc_list[0],
4293  &instance->my_id);
4294  instance->my_proc_list_entries = 1;
4295  instance->my_failed_list_entries = 0;
4296 
4297  memb_state_commit_token_create (instance);
4298 
4299  memb_state_commit_enter (instance);
4300  return;
4301  }
4302  if (memb_consensus_agreed (instance) &&
4303  memb_lowest_in_config (instance)) {
4304 
4305  memb_state_commit_token_create (instance);
4306 
4307  memb_state_commit_enter (instance);
4308  } else {
4309  goto out;
4310  }
4311  } else
4312  if (memb_set_subset (proc_list,
4313  memb_join->proc_list_entries,
4314  instance->my_proc_list,
4315  instance->my_proc_list_entries) &&
4316 
4317  memb_set_subset (failed_list,
4318  memb_join->failed_list_entries,
4319  instance->my_failed_list,
4320  instance->my_failed_list_entries)) {
4321 
4322  goto out;
4323  } else
4324  if (memb_set_subset (&memb_join->system_from, 1,
4325  instance->my_failed_list, instance->my_failed_list_entries)) {
4326 
4327  goto out;
4328  } else {
4329  memb_set_merge (proc_list,
4330  memb_join->proc_list_entries,
4331  instance->my_proc_list, &instance->my_proc_list_entries);
4332 
4333  if (memb_set_subset (
4334  &instance->my_id, 1,
4335  failed_list, memb_join->failed_list_entries)) {
4336 
4337  memb_set_merge (
4338  &memb_join->system_from, 1,
4339  instance->my_failed_list, &instance->my_failed_list_entries);
4340  } else {
4341  if (memb_set_subset (
4342  &memb_join->system_from, 1,
4343  instance->my_memb_list,
4344  instance->my_memb_entries)) {
4345 
4346  if (memb_set_subset (
4347  &memb_join->system_from, 1,
4348  instance->my_failed_list,
4349  instance->my_failed_list_entries) == 0) {
4350 
4351  memb_set_merge (failed_list,
4352  memb_join->failed_list_entries,
4353  instance->my_failed_list, &instance->my_failed_list_entries);
4354  } else {
4355  memb_set_subtract (fail_minus_memb,
4356  &fail_minus_memb_entries,
4357  failed_list,
4358  memb_join->failed_list_entries,
4359  instance->my_memb_list,
4360  instance->my_memb_entries);
4361 
4362  memb_set_merge (fail_minus_memb,
4363  fail_minus_memb_entries,
4364  instance->my_failed_list,
4365  &instance->my_failed_list_entries);
4366  }
4367  }
4368  }
4369  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4370  gather_entered = 1;
4371  }
4372 
4373 out:
4374  if (gather_entered == 0 &&
4375  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4376 
4377  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4378  }
4379 }
4380 
4381 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4382 {
4383  int i;
4384  struct srp_addr *in_proc_list;
4385  struct srp_addr *in_failed_list;
4386  struct srp_addr *out_proc_list;
4387  struct srp_addr *out_failed_list;
4388 
4389  out->header.type = in->header.type;
4391  out->header.nodeid = swab32 (in->header.nodeid);
4392  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4395  out->ring_seq = swab64 (in->ring_seq);
4396 
4397  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4398  in_failed_list = in_proc_list + out->proc_list_entries;
4399  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4400  out_failed_list = out_proc_list + out->proc_list_entries;
4401 
4402  for (i = 0; i < out->proc_list_entries; i++) {
4403  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4404  }
4405  for (i = 0; i < out->failed_list_entries; i++) {
4406  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4407  }
4408 }
4409 
4410 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4411 {
4412  int i;
4413  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4414  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4415  struct memb_commit_token_memb_entry *in_memb_list;
4416  struct memb_commit_token_memb_entry *out_memb_list;
4417 
4418  out->header.type = in->header.type;
4420  out->header.nodeid = swab32 (in->header.nodeid);
4421  out->token_seq = swab32 (in->token_seq);
4423  out->ring_id.seq = swab64 (in->ring_id.seq);
4424  out->retrans_flg = swab32 (in->retrans_flg);
4425  out->memb_index = swab32 (in->memb_index);
4426  out->addr_entries = swab32 (in->addr_entries);
4427 
4428  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4429  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4430  for (i = 0; i < out->addr_entries; i++) {
4431  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4432 
4433  /*
4434  * Only convert the memb entry if it has been set
4435  */
4436  if (in_memb_list[i].ring_id.rep.family != 0) {
4437  totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
4438  &in_memb_list[i].ring_id.rep);
4439 
4440  out_memb_list[i].ring_id.seq =
4441  swab64 (in_memb_list[i].ring_id.seq);
4442  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4443  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4444  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4445  }
4446  }
4447 }
4448 
4449 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4450 {
4451  int i;
4452 
4453  out->header.type = in->header.type;
4455  out->header.nodeid = swab32 (in->header.nodeid);
4456  out->seq = swab32 (in->seq);
4457  out->token_seq = swab32 (in->token_seq);
4458  out->aru = swab32 (in->aru);
4460  out->aru_addr = swab32(in->aru_addr);
4461  out->ring_id.seq = swab64 (in->ring_id.seq);
4462  out->fcc = swab32 (in->fcc);
4463  out->backlog = swab32 (in->backlog);
4464  out->retrans_flg = swab32 (in->retrans_flg);
4466  for (i = 0; i < out->rtr_list_entries; i++) {
4468  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4469  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4470  }
4471 }
4472 
4473 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4474 {
4475  out->header.type = in->header.type;
4477  out->header.nodeid = swab32 (in->header.nodeid);
4479 
4480  out->seq = swab32 (in->seq);
4481  out->this_seqno = swab32 (in->this_seqno);
4483  out->ring_id.seq = swab64 (in->ring_id.seq);
4484  out->node_id = swab32 (in->node_id);
4485  out->guarantee = swab32 (in->guarantee);
4486  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4487 }
4488 
4489 static void memb_merge_detect_endian_convert (
4490  const struct memb_merge_detect *in,
4491  struct memb_merge_detect *out)
4492 {
4493  out->header.type = in->header.type;
4495  out->header.nodeid = swab32 (in->header.nodeid);
4497  out->ring_id.seq = swab64 (in->ring_id.seq);
4498  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4499 }
4500 
4501 static int ignore_join_under_operational (
4502  struct totemsrp_instance *instance,
4503  const struct memb_join *memb_join)
4504 {
4505  struct srp_addr *proc_list;
4506  struct srp_addr *failed_list;
4507  unsigned long long ring_seq;
4508 
4509  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4510  failed_list = proc_list + memb_join->proc_list_entries;
4511  ring_seq = memb_join->ring_seq;
4512 
4513  if (memb_set_subset (&instance->my_id, 1,
4514  failed_list, memb_join->failed_list_entries)) {
4515  return (1);
4516  }
4517 
4518  /*
4519  * In operational state, my_proc_list is exactly the same as
4520  * my_memb_list.
4521  */
4522  if ((memb_set_subset (&memb_join->system_from, 1,
4523  instance->my_memb_list, instance->my_memb_entries)) &&
4524  (ring_seq < instance->my_ring_id.seq)) {
4525  return (1);
4526  }
4527 
4528  return (0);
4529 }
4530 
4531 static int message_handler_memb_join (
4532  struct totemsrp_instance *instance,
4533  const void *msg,
4534  size_t msg_len,
4535  int endian_conversion_needed)
4536 {
4537  const struct memb_join *memb_join;
4538  struct memb_join *memb_join_convert = alloca (msg_len);
4539 
4540  if (endian_conversion_needed) {
4541  memb_join = memb_join_convert;
4542  memb_join_endian_convert (msg, memb_join_convert);
4543 
4544  } else {
4545  memb_join = msg;
4546  }
4547  /*
4548  * If the process paused because it wasn't scheduled in a timely
4549  * fashion, flush the join messages because they may be queued
4550  * entries
4551  */
4552  if (pause_flush (instance)) {
4553  return (0);
4554  }
4555 
4556  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4557  instance->token_ring_id_seq = memb_join->ring_seq;
4558  }
4559  switch (instance->memb_state) {
4561  if (!ignore_join_under_operational (instance, memb_join)) {
4562  memb_join_process (instance, memb_join);
4563  }
4564  break;
4565 
4566  case MEMB_STATE_GATHER:
4567  memb_join_process (instance, memb_join);
4568  break;
4569 
4570  case MEMB_STATE_COMMIT:
4571  if (memb_set_subset (&memb_join->system_from,
4572  1,
4573  instance->my_new_memb_list,
4574  instance->my_new_memb_entries) &&
4575 
4576  memb_join->ring_seq >= instance->my_ring_id.seq) {
4577 
4578  memb_join_process (instance, memb_join);
4579  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4580  }
4581  break;
4582 
4583  case MEMB_STATE_RECOVERY:
4584  if (memb_set_subset (&memb_join->system_from,
4585  1,
4586  instance->my_new_memb_list,
4587  instance->my_new_memb_entries) &&
4588 
4589  memb_join->ring_seq >= instance->my_ring_id.seq) {
4590 
4591  memb_join_process (instance, memb_join);
4592  memb_recovery_state_token_loss (instance);
4593  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4594  }
4595  break;
4596  }
4597  return (0);
4598 }
4599 
4600 static int message_handler_memb_commit_token (
4601  struct totemsrp_instance *instance,
4602  const void *msg,
4603  size_t msg_len,
4604  int endian_conversion_needed)
4605 {
4606  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4607  struct memb_commit_token *memb_commit_token;
4608  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4609  int sub_entries;
4610 
4611  struct srp_addr *addr;
4612 
4614  "got commit token");
4615 
4616  if (endian_conversion_needed) {
4617  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4618  } else {
4619  memcpy (memb_commit_token_convert, msg, msg_len);
4620  }
4621  memb_commit_token = memb_commit_token_convert;
4622  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4623 
4624 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4625  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4626  return (0);
4627  }
4628 #endif
4629  switch (instance->memb_state) {
4631  /* discard token */
4632  break;
4633 
4634  case MEMB_STATE_GATHER:
4635  memb_set_subtract (sub, &sub_entries,
4636  instance->my_proc_list, instance->my_proc_list_entries,
4637  instance->my_failed_list, instance->my_failed_list_entries);
4638 
4639  if (memb_set_equal (addr,
4640  memb_commit_token->addr_entries,
4641  sub,
4642  sub_entries) &&
4643 
4644  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4645  memcpy (instance->commit_token, memb_commit_token, msg_len);
4646  memb_state_commit_enter (instance);
4647  }
4648  break;
4649 
4650  case MEMB_STATE_COMMIT:
4651  /*
4652  * If retransmitted commit tokens are sent on this ring
4653  * filter them out and only enter recovery once the
4654  * commit token has traversed the array. This is
4655  * determined by :
4656  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4657  */
4658  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4659  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4660  memb_state_recovery_enter (instance, memb_commit_token);
4661  }
4662  break;
4663 
4664  case MEMB_STATE_RECOVERY:
4665  if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4666 
4667  /* Filter out duplicated tokens */
4668  if (instance->originated_orf_token) {
4669  break;
4670  }
4671 
4672  instance->originated_orf_token = 1;
4673 
4675  "Sending initial ORF token");
4676 
4677  // TODO convert instead of initiate
4678  orf_token_send_initial (instance);
4679  reset_token_timeout (instance); // REVIEWED
4680  reset_token_retransmit_timeout (instance); // REVIEWED
4681  }
4682  break;
4683  }
4684  return (0);
4685 }
4686 
4687 static int message_handler_token_hold_cancel (
4688  struct totemsrp_instance *instance,
4689  const void *msg,
4690  size_t msg_len,
4691  int endian_conversion_needed)
4692 {
4693  const struct token_hold_cancel *token_hold_cancel = msg;
4694 
4695  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4696  sizeof (struct memb_ring_id)) == 0) {
4697 
4698  instance->my_seq_unchanged = 0;
4699  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
4700  timer_function_token_retransmit_timeout (instance);
4701  }
4702  }
4703  return (0);
4704 }
4705 
4707  void *context,
4708  const void *msg,
4709  unsigned int msg_len)
4710 {
4711  struct totemsrp_instance *instance = context;
4712  const struct message_header *message_header = msg;
4713 
4714  if (msg_len < sizeof (struct message_header)) {
4716  "Received message is too short... ignoring %u.",
4717  (unsigned int)msg_len);
4718  return;
4719  }
4720 
4721 
4722  switch (message_header->type) {
4724  instance->stats.orf_token_rx++;
4725  break;
4726  case MESSAGE_TYPE_MCAST:
4727  instance->stats.mcast_rx++;
4728  break;
4730  instance->stats.memb_merge_detect_rx++;
4731  break;
4733  instance->stats.memb_join_rx++;
4734  break;
4736  instance->stats.memb_commit_token_rx++;
4737  break;
4739  instance->stats.token_hold_cancel_rx++;
4740  break;
4741  default:
4742  log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
4743 printf ("wrong message type\n");
4744  instance->stats.rx_msg_dropped++;
4745  return;
4746  }
4747  /*
4748  * Handle incoming message
4749  */
4750  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
4751  instance,
4752  msg,
4753  msg_len,
4754  message_header->endian_detector != ENDIAN_LOCAL);
4755 }
4756 
4758  void *context,
4759  const struct totem_ip_address *iface_addr,
4760  unsigned int iface_no)
4761 {
4762  struct totemsrp_instance *instance = context;
4763  int i;
4764 
4765  totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
4766  assert (instance->my_id.addr[iface_no].nodeid);
4767 
4768  totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
4769 
4770  if (instance->iface_changes++ == 0) {
4771  instance->memb_ring_id_create_or_load (&instance->my_ring_id,
4772  &instance->my_id.addr[0]);
4773  instance->token_ring_id_seq = instance->my_ring_id.seq;
4774  log_printf (
4775  instance->totemsrp_log_level_debug,
4776  "Created or loaded sequence id %llx.%s for this ring.",
4777  instance->my_ring_id.seq,
4778  totemip_print (&instance->my_ring_id.rep));
4779 
4780  if (instance->totemsrp_service_ready_fn) {
4781  instance->totemsrp_service_ready_fn ();
4782  }
4783 
4784  }
4785 
4786  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
4787  totemsrp_member_add (instance,
4788  &instance->totem_config->interfaces[iface_no].member_list[i],
4789  iface_no);
4790  }
4791 
4792  if (instance->iface_changes >= instance->totem_config->interface_count) {
4793  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
4794  }
4795 }
4796 
4797 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
4798  totem_config->net_mtu -= sizeof (struct mcast);
4799 }
4800 
4802  void *context,
4803  void (*totem_service_ready) (void))
4804 {
4805  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4806 
4807  instance->totemsrp_service_ready_fn = totem_service_ready;
4808 }
4809 
4811  void *context,
4812  const struct totem_ip_address *member,
4813  int ring_no)
4814 {
4815  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4816  int res;
4817 
4818  res = totemrrp_member_add (instance->totemrrp_context, member, ring_no);
4819 
4820  return (res);
4821 }
4822 
4824  void *context,
4825  const struct totem_ip_address *member,
4826  int ring_no)
4827 {
4828  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4829  int res;
4830 
4831  res = totemrrp_member_remove (instance->totemrrp_context, member, ring_no);
4832 
4833  return (res);
4834 }
4835 
4836 void totemsrp_threaded_mode_enable (void *context)
4837 {
4838  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4839 
4840  instance->threaded_mode_enabled = 1;
4841 }
4842 
4843 void totemsrp_trans_ack (void *context)
4844 {
4845  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4846 
4847  instance->waiting_trans_ack = 0;
4848  instance->totemsrp_waiting_trans_ack_cb_fn (0);
4849 }
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:462
unsigned int backlog
Definition: totemsrp.c:208
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:449
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:441
uint8_t no_addrs
Definition: totemrrp.h:59
unsigned short family
Definition: coroapi.h:113
char encapsulated
Definition: totemsrp.c:61
gather_state_from
Definition: totemsrp.c:537
int totemrrp_iface_check(void *rrp_context)
Definition: totemrrp.c:2216
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:4757
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:101
struct srp_addr system_from
Definition: totemsrp.c:218
#define ENDIAN_LOCAL
Definition: totemsrp.c:137
struct memb_ring_id ring_id
Definition: totemsrp.c:196
struct list_head list
Definition: totemsrp.c:163
uint32_t waiting_trans_ack
Definition: totemsrp.c:519
struct srp_addr system_from
Definition: totemsrp.c:186
struct memb_ring_id ring_id
Definition: totemsrp.c:255
int totemsrp_log_level_debug
Definition: totemsrp.c:427
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totem.h:255
int my_leave_memb_entries
Definition: totemsrp.c:335
struct message_header header
Definition: totemsrp.c:185
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:489
unsigned int proc_list_entries
Definition: totemsrp.c:219
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:114
unsigned int interface_count
Definition: totem.h:115
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1145
struct list_head * next
Definition: list.h:47
uint64_t memb_join_tx
Definition: totem.h:249
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:455
The totem_ip_address struct.
Definition: coroapi.h:111
unsigned int seq
Definition: totemsrp.c:62
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totem.h:274
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:214
int totemsrp_log_level_error
Definition: totemsrp.c:421
int old_ring_state_aru
Definition: totemsrp.c:487
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:102
unsigned int seq
Definition: totemsrp.c:203
struct memb_ring_id ring_id
Definition: totemsrp.c:245
int fcc_remcast_current
Definition: totemsrp.c:297
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:414
unsigned int failed_list_entries
Definition: totemsrp.c:220
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:65
uint64_t mcast_rx
Definition: totem.h:253
unsigned long long int tv_old
Definition: totemsrp.c:3643
#define SEQNO_START_TOKEN
Definition: totemsrp.c:115
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totemsrp.c:471
unsigned int token_hold_timeout
Definition: totem.h:133
int member_count
Definition: totem.h:70
unsigned int msg_len
Definition: totemsrp.c:270
struct memb_ring_id ring_id
Definition: totemsrp.c:207
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:71
int totemip_compare(const void *a, const void *b)
Definition: totemip.c:130
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:4810
void * token_sent_event_handle
Definition: totemsrp.c:524
int retrans_flg
Definition: totemsrp.c:210
struct srp_addr system_from
Definition: totemsrp.c:233
int my_new_memb_entries
Definition: totemsrp.c:325
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
int totemsrp_log_level_notice
Definition: totemsrp.c:425
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1134
unsigned int my_pbl
Definition: totemsrp.c:503
char rrp_mode[TOTEM_RRP_MODE_BYTES]
Definition: totem.h:161
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:4797
int totemsrp_log_level_warning
Definition: totemsrp.c:423
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1120
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemrrp.c:2319
unsigned int my_aru
Definition: totemsrp.c:381
struct message_header header
Definition: totemsrp.c:60
uint64_t memb_merge_detect_rx
Definition: totem.h:248
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1062
int guarantee
Definition: totemsrp.c:66
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
struct message_header header
Definition: totemsrp.c:232
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:259
unsigned int seq
Definition: totemsrp.c:187
char commit_token_storage[40000]
Definition: totemsrp.c:525
unsigned int rrp_problem_count_timeout
Definition: totem.h:153
struct list_head token_callback_sent_listhead
Definition: totemsrp.c:387
The sq struct.
Definition: sq.h:43
unsigned int set_aru
Definition: totemsrp.c:483
struct cs_queue new_message_queue
Definition: totemsrp.c:368
int my_rotation_counter
Definition: totemsrp.c:355
int earliest_token
Definition: totem.h:271
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
uint64_t orf_token_tx
Definition: totem.h:245
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3490
uint64_t gather_token_lost
Definition: totem.h:261
int totemsrp_log_level_trace
Definition: totemsrp.c:429
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:95
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
Definition: totemrrp.c:2225
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
memb_state
Definition: totemsrp.c:278
void * totemrrp_buffer_alloc(void *rrp_context)
Definition: totemrrp.c:2118
unsigned int downcheck_timeout
Definition: totem.h:145
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:101
uint64_t memb_commit_token_tx
Definition: totem.h:254
Definition: list.h:46
int my_deliver_memb_entries
Definition: totemsrp.c:331
unsigned int max_network_delay
Definition: totem.h:171
unsigned int heartbeat_failures_allowed
Definition: totem.h:169
#define TOTEM_TOKEN_STATS_MAX
Definition: totem.h:273
unsigned int my_last_seq
Definition: totemsrp.c:491
int my_left_memb_entries
Definition: totemsrp.c:333
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
struct message_item __attribute__
unsigned long long token_ring_id_seq
Definition: totemsrp.c:479
struct totem_ip_address mcast_address
Definition: totemsrp.c:447
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3455
unsigned int send_join_timeout
Definition: totem.h:139
unsigned int window_size
Definition: totem.h:173
int guarantee
Definition: totemsrp.c:191
unsigned int seq
Definition: totemsrp.c:197
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:4801
unsigned int rrp_problem_count_threshold
Definition: totem.h:155
struct mcast * mcast
Definition: totemsrp.c:274
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
uint64_t operational_entered
Definition: totem.h:258
void(*) in log_level_security)
Definition: totem.h:82
unsigned long long ring_seq
Definition: totemsrp.c:221
#define INTERFACE_MAX
Definition: coroapi.h:88
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2487
message_type
Definition: totemsrp.c:139
int latest_token
Definition: totem.h:272
uint64_t operational_token_lost
Definition: totem.h:259
unsigned int received_flg
Definition: totemsrp.c:63
uint64_t consensus_timeouts
Definition: totem.h:266
unsigned int aru_addr
Definition: totemsrp.c:206
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:383
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:679
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:410
uint64_t recovery_token_lost
Definition: totem.h:265
unsigned int backlog
Definition: totemsrp.c:66
int this_seqno
Definition: totemsrp.c:188
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:222
unsigned int token_retransmits_before_loss_const
Definition: totem.h:135
struct message_header header
Definition: totemsrp.c:239
int totemrrp_finalize(void *rrp_context)
Definition: totemrrp.c:1974
struct list_head token_callback_received_listhead
Definition: totemsrp.c:385
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2306
struct rtr_item rtr_list[0]
Definition: totemsrp.c:70
unsigned int retrans_flg
Definition: totemsrp.c:256
int totemsrp_ring_reenable(void *srp_context)
Definition: totemsrp.c:1157
struct memb_ring_id ring_id
Definition: totemsrp.c:189
unsigned int seqno_unchanged_const
Definition: totem.h:149
uint64_t commit_token_lost
Definition: totem.h:263
unsigned int miss_count_const
Definition: totem.h:187
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
Definition: totemrrp.c:2240
uint64_t token_hold_cancel_rx
Definition: totem.h:257
unsigned int join_timeout
Definition: totem.h:137
unsigned int aru
Definition: totemsrp.c:246
uint32_t originated_orf_token
Definition: totemsrp.c:515
unsigned int nodeid
Definition: coroapi.h:112
int totemrrp_send_flush(void *rrp_context)
Definition: totemrrp.c:2163
uint64_t pause_timestamp
Definition: totemsrp.c:507
int my_set_retrans_flg
Definition: totemsrp.c:357
struct message_header header
Definition: totemsrp.c:202
struct totem_ip_address mcast_addr
Definition: totem.h:67
char encapsulated
Definition: totemrrp.c:554
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2293
Linked list API.
unsigned int received_flg
Definition: totemsrp.c:248
unsigned int my_cbl
Definition: totemsrp.c:505
struct totem_ip_address rep
Definition: coroapi.h:123
unsigned int last_released
Definition: totemsrp.c:481
int orf_token_retransmit_size
Definition: totemsrp.c:391
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2556
unsigned int rrp_autorecovery_check_timeout
Definition: totem.h:159
uint64_t mcast_retx
Definition: totem.h:252
unsigned int msg_len
Definition: totemsrp.c:275
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:97
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totemsrp.c:467
unsigned int fail_to_recv_const
Definition: totem.h:147
unsigned int token_seq
Definition: totemsrp.c:204
struct mcast * mcast
Definition: totemsrp.c:269
void * token_recv_event_handle
Definition: totemsrp.c:523
struct totem_ip_address boundto
Definition: totem.h:66
unsigned int my_high_seq_received
Definition: totemsrp.c:351
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
Definition: totemrrp.c:2003
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:445
totem_event_type
Definition: totem.h:212
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:406
int old_ring_state_saved
Definition: totemsrp.c:485
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
uint64_t rx_msg_dropped
Definition: totem.h:267
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:75
int totemsrp_log_level_security
Definition: totemsrp.c:419
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:402
struct totem_config * totem_config
Definition: totemsrp.c:497
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:164
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
uint32_t continuous_gather
Definition: totem.h:268
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:4836
unsigned int aru
Definition: totemsrp.c:63
encapsulation_type
Definition: totemsrp.c:148
unsigned int net_mtu
Definition: totem.h:165
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:832
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2478
unsigned int node_id
Definition: totemsrp.c:190
int totemrrp_recv_flush(void *rrp_context)
Definition: totemrrp.c:2154
uint32_t orf_token_discard
Definition: totemsrp.c:513
int my_failed_list_entries
Definition: totemsrp.c:323
struct srp_addr my_id
Definition: totemsrp.c:303
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
uint64_t token_hold_cancel_tx
Definition: totem.h:256
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totem.h:191
unsigned int token_timeout
Definition: totem.h:129
Definition: totemsrp.c:244
unsigned int high_delivered
Definition: totemsrp.c:247
unsigned int consensus_timeout
Definition: totem.h:141
totemsrp_stats_t stats
Definition: totemsrp.c:511
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
Definition: totemsrp.c:4706
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
unsigned short endian_detector
Definition: totemsrp.c:62
uint64_t mcast_tx
Definition: totem.h:251
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Definition: totemrrp.c:2125
void * totemrrp_context
Definition: totemsrp.c:495
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
struct message_header header
Definition: totemsrp.c:217
struct sq regular_sort_queue
Definition: totemsrp.c:374
int my_retrans_flg_count
Definition: totemsrp.c:359
unsigned int nodeid
Definition: totemsrp.c:63
The memb_ring_id struct.
Definition: coroapi.h:122
#define SEQNO_START_MSG
Definition: totemsrp.c:114
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1038
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:96
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
unsigned short family
Definition: coroapi.h:76
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
unsigned int aru
Definition: totemsrp.c:205
const char * gather_state_from_desc[]
Definition: totemsrp.c:557
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:408
int my_trans_memb_entries
Definition: totemsrp.c:327
unsigned int my_trc
Definition: totemsrp.c:501
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:464
uint64_t memb_merge_detect_tx
Definition: totem.h:247
unsigned int high_delivered
Definition: totemsrp.c:62
struct rtr_item rtr_list[0]
Definition: totemsrp.c:212
totemsrp_stats_t * srp
Definition: totem.h:283
int consensus_list_entries
Definition: totemsrp.c:301
unsigned int rrp_problem_count_mcast_threshold
Definition: totem.h:157
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
Definition: totemrrp.c:2132
char type
Definition: totemsrp.c:60
uint64_t memb_join_rx
Definition: totem.h:250
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2196
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:66
#define FRAME_SIZE_MAX
Definition: totem.h:50
int rtr_list_entries
Definition: totemsrp.c:211
uint32_t threaded_mode_enabled
Definition: totemsrp.c:517
enum totem_callback_token_type callback_type
Definition: totemsrp.c:165
int totemrrp_mcast_recv_empty(void *rrp_context)
Definition: totemrrp.c:2282
int my_proc_list_entries
Definition: totemsrp.c:321
#define list_entry(ptr, type, member)
Definition: list.h:84
unsigned long long ring_seq
Definition: totemsrp.c:64
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:163
unsigned short endian_detector
Definition: totemrrp.c:555
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2182
struct memb_ring_id ring_id
Definition: totemsrp.c:240
#define log_printf(level, format, args...)
Definition: totemsrp.c:691
unsigned long long seq
Definition: coroapi.h:124
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:4843
unsigned int max_messages
Definition: totem.h:175
uint64_t recovery_entered
Definition: totem.h:264
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:412
struct memb_commit_token * commit_token
Definition: totemsrp.c:509
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:299
struct srp_addr system_from
Definition: totemsrp.c:61
struct srp_addr addr
Definition: totemsrp.c:157
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:530
int totemsrp_subsys_id
Definition: totemsrp.c:431
unsigned int merge_timeout
Definition: totem.h:143
unsigned int use_heartbeat
Definition: totemsrp.c:499
struct message_header header
Definition: totemsrp.c:253
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:4823
unsigned int token_retransmit_timeout
Definition: totem.h:131
int rtr_list_entries
Definition: totemsrp.c:69
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:100
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:433
unsigned int token_seq
Definition: totemsrp.c:254
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:71
unsigned int my_token_seq
Definition: totemsrp.c:393
struct memb_ring_id ring_id
Definition: totemsrp.c:64
unsigned int my_last_aru
Definition: totemsrp.c:345
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
Definition: totemrrp.c:2259
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
uint64_t commit_entered
Definition: totem.h:262
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:404
struct totem_ip_address addr[INTERFACE_MAX]
Definition: totemrrp.h:60
unsigned int rrp_token_expired_timeout
Definition: totem.h:151
struct memb_ring_id ring_id
Definition: totemsrp.c:234
unsigned int my_install_seq
Definition: totemsrp.c:353
uint64_t orf_token_rx
Definition: totem.h:246
unsigned int nodeid
Definition: totemsrp.c:180
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2171
unsigned int threads
Definition: totem.h:167
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totem.h:195
struct sq recovery_sort_queue
Definition: totemsrp.c:376
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
Definition: totemrrp.c:2144
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
unsigned int fcc
Definition: totemsrp.c:209