corosync  3.0.2
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2018 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46  */
47 
48 #include <config.h>
49 
50 #include <assert.h>
51 #ifdef HAVE_ALLOCA_H
52 #include <alloca.h>
53 #endif
54 #include <sys/mman.h>
55 #include <sys/types.h>
56 #include <sys/stat.h>
57 #include <sys/socket.h>
58 #include <netdb.h>
59 #include <sys/un.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
64 #include <unistd.h>
65 #include <fcntl.h>
66 #include <stdlib.h>
67 #include <stdio.h>
68 #include <errno.h>
69 #include <sched.h>
70 #include <time.h>
71 #include <sys/time.h>
72 #include <sys/poll.h>
73 #include <sys/uio.h>
74 #include <limits.h>
75 
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
80 
81 #include <corosync/swab.h>
82 #include <corosync/sq.h>
83 
84 #define LOGSYS_UTILS_ONLY 1
85 #include <corosync/logsys.h>
86 
87 #include "totemsrp.h"
88 #include "totemnet.h"
89 
90 #include "cs_queue.h"
91 
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
96 #define MAXIOVS 5
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000 /* bytes */
99 #define LEAVE_DUMMY_NODEID 0
100 
101 /*
102  * SRP address.
103  */
104 struct srp_addr {
105  unsigned int nodeid;
106 };
107 
108 /*
109  * Rollover handling:
110  * SEQNO_START_MSG is the starting sequence number after a new configuration
111  * This should remain zero, unless testing overflow in which case
112  * 0x7ffff000 and 0xfffff000 are good starting values.
113  *
114  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
115  * for a token. This should remain zero, unless testing overflow in which
116  * case 07fffff00 or 0xffffff00 are good starting values.
117  */
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
120 
121 /*
122  * These can be used ot test different rollover points
123  * #define SEQNO_START_MSG 0xfffffe00
124  * #define SEQNO_START_TOKEN 0xfffffe00
125  */
126 
127 /*
128  * These can be used to test the error recovery algorithms
129  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
130  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
131  * #define TEST_DROP_MCAST_PERCENTAGE 50
132  * #define TEST_RECOVERY_MSG_COUNT 300
133  */
134 
135 /*
136  * we compare incoming messages to determine if their endian is
137  * different - if so convert them
138  *
139  * do not change
140  */
141 #define ENDIAN_LOCAL 0xff22
142 
144  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
145  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
146  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
147  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
148  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
149  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
150 };
151 
155 };
156 
157 /*
158  * New membership algorithm local variables
159  */
161  struct srp_addr addr;
162  int set;
163 };
164 
165 
167  struct qb_list_head list;
168  int (*callback_fn) (enum totem_callback_token_type type, const void *);
169  enum totem_callback_token_type callback_type;
170  int delete;
171  void *data;
172 };
173 
174 
176  int mcast;
177  int token;
178 };
179 
180 struct mcast {
183  unsigned int seq;
186  unsigned int node_id;
188 } __attribute__((packed));
189 
190 
191 struct rtr_item {
193  unsigned int seq;
194 }__attribute__((packed));
195 
196 
197 struct orf_token {
199  unsigned int seq;
200  unsigned int token_seq;
201  unsigned int aru;
202  unsigned int aru_addr;
204  unsigned int backlog;
205  unsigned int fcc;
208  struct rtr_item rtr_list[0];
209 }__attribute__((packed));
210 
211 
212 struct memb_join {
215  unsigned int proc_list_entries;
216  unsigned int failed_list_entries;
217  unsigned long long ring_seq;
218  unsigned char end_of_memb_join[0];
219 /*
220  * These parts of the data structure are dynamic:
221  * struct srp_addr proc_list[];
222  * struct srp_addr failed_list[];
223  */
224 } __attribute__((packed));
225 
226 
231 } __attribute__((packed));
232 
233 
237 } __attribute__((packed));
238 
239 
242  unsigned int aru;
243  unsigned int high_delivered;
244  unsigned int received_flg;
245 }__attribute__((packed));
246 
247 
250  unsigned int token_seq;
252  unsigned int retrans_flg;
255  unsigned char end_of_commit_token[0];
256 /*
257  * These parts of the data structure are dynamic:
258  *
259  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
260  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
261  */
262 }__attribute__((packed));
264 struct message_item {
265  struct mcast *mcast;
266  unsigned int msg_len;
267 };
270  struct mcast *mcast;
271  unsigned int msg_len;
272 };
273 
279 };
280 
283 
285 
286  /*
287  * Flow control mcasts and remcasts on last and current orf_token
288  */
290 
292 
294 
295  struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
296 
298 
300 
301  struct srp_addr my_id;
302 
303  struct totem_ip_address my_addrs[INTERFACE_MAX];
304 
305  struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
306 
307  struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
308 
309  struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
310 
311  struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
312 
313  struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
314 
315  struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
316 
317  struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
318 
319  unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX];
320 
322 
324 
326 
328 
330 
332 
334 
336 
337  struct memb_ring_id my_ring_id;
338 
339  struct memb_ring_id my_old_ring_id;
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
368  struct cs_queue new_message_queue;
369 
370  struct cs_queue new_message_queue_trans;
371 
372  struct cs_queue retrans_message_queue;
373 
374  struct sq regular_sort_queue;
375 
376  struct sq recovery_sort_queue;
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
385  struct qb_list_head token_callback_received_listhead;
386 
387  struct qb_list_head token_callback_sent_listhead;
388 
389  char orf_token_retransmit[TOKEN_SIZE_MAX];
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
402  qb_loop_timer_handle timer_orf_token_warning;
403 
405 
407 
408  qb_loop_timer_handle timer_merge_detect_timeout;
409 
411 
413 
414  qb_loop_timer_handle memb_timer_state_commit_timeout;
415 
416  qb_loop_timer_handle timer_heartbeat_timeout;
417 
418  /*
419  * Function and data used to log messages
420  */
422 
424 
426 
428 
430 
432 
434 
435  void (*totemsrp_log_printf) (
436  int level,
437  int subsys,
438  const char *function,
439  const char *file,
440  int line,
441  const char *format, ...)__attribute__((format(printf, 6, 7)));;
442 
444 
445 //TODO struct srp_addr next_memb;
446 
448 
449  struct totem_ip_address mcast_address;
450 
451  void (*totemsrp_deliver_fn) (
452  unsigned int nodeid,
453  const void *msg,
454  unsigned int msg_len,
455  int endian_conversion_required);
456 
457  void (*totemsrp_confchg_fn) (
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id);
463 
464  void (*totemsrp_service_ready_fn) (void);
465 
466  void (*totemsrp_waiting_trans_ack_cb_fn) (
467  int waiting_trans_ack);
468 
469  void (*memb_ring_id_create_or_load) (
470  struct memb_ring_id *memb_ring_id,
471  unsigned int nodeid);
472 
473  void (*memb_ring_id_store) (
474  const struct memb_ring_id *memb_ring_id,
475  unsigned int nodeid);
476 
478 
480 
481  unsigned long long token_ring_id_seq;
482 
483  unsigned int last_released;
484 
485  unsigned int set_aru;
486 
488 
490 
492 
493  unsigned int my_last_seq;
494 
495  struct timeval tv_old;
496 
498 
500 
501  unsigned int use_heartbeat;
502 
503  unsigned int my_trc;
504 
505  unsigned int my_pbl;
506 
507  unsigned int my_cbl;
508 
509  uint64_t pause_timestamp;
510 
512 
514 
516 
518 
520 
522 
523  int flushing;
524 
527  char commit_token_storage[40000];
528 };
529 
531  int count;
532  int (*handler_functions[6]) (
533  struct totemsrp_instance *instance,
534  const void *msg,
535  size_t msg_len,
536  int endian_conversion_needed);
537 };
538 
557 };
558 
559 const char* gather_state_from_desc [] = {
560  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
561  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
563  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
564  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
565  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
566  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
567  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
568  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
570  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
571  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
572  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
573  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
574  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
575  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
576 };
577 
578 /*
579  * forward decls
580  */
581 static int message_handler_orf_token (
582  struct totemsrp_instance *instance,
583  const void *msg,
584  size_t msg_len,
585  int endian_conversion_needed);
586 
587 static int message_handler_mcast (
588  struct totemsrp_instance *instance,
589  const void *msg,
590  size_t msg_len,
591  int endian_conversion_needed);
592 
593 static int message_handler_memb_merge_detect (
594  struct totemsrp_instance *instance,
595  const void *msg,
596  size_t msg_len,
597  int endian_conversion_needed);
598 
599 static int message_handler_memb_join (
600  struct totemsrp_instance *instance,
601  const void *msg,
602  size_t msg_len,
603  int endian_conversion_needed);
604 
605 static int message_handler_memb_commit_token (
606  struct totemsrp_instance *instance,
607  const void *msg,
608  size_t msg_len,
609  int endian_conversion_needed);
610 
611 static int message_handler_token_hold_cancel (
612  struct totemsrp_instance *instance,
613  const void *msg,
614  size_t msg_len,
615  int endian_conversion_needed);
616 
617 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
618 
619 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
620 
621 static void srp_addr_to_nodeid (
622  struct totemsrp_instance *instance,
623  unsigned int *nodeid_out,
624  struct srp_addr *srp_addr_in,
625  unsigned int entries);
626 
627 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
628 
629 static void memb_leave_message_send (struct totemsrp_instance *instance);
630 
631 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
632 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
633 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
634 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
635  int fcc_mcasts_allowed);
636 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
637 
638 static void memb_ring_id_set (struct totemsrp_instance *instance,
639  const struct memb_ring_id *ring_id);
640 static void target_set_completed (void *context);
641 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
642 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
643 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
644 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
645 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
646 static int token_hold_cancel_send (struct totemsrp_instance *instance);
647 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
648 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
649 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
650 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
651 static void memb_merge_detect_endian_convert (
652  const struct memb_merge_detect *in,
653  struct memb_merge_detect *out);
654 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
655 static void timer_function_orf_token_timeout (void *data);
656 static void timer_function_orf_token_warning (void *data);
657 static void timer_function_pause_timeout (void *data);
658 static void timer_function_heartbeat_timeout (void *data);
659 static void timer_function_token_retransmit_timeout (void *data);
660 static void timer_function_token_hold_retransmit_timeout (void *data);
661 static void timer_function_merge_detect_timeout (void *data);
662 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
663 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
664 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
665 
666 void main_deliver_fn (
667  void *context,
668  const void *msg,
669  unsigned int msg_len,
670  const struct sockaddr_storage *system_from);
671 
673  void *context,
674  const struct totem_ip_address *iface_address,
675  unsigned int iface_no);
676 
678  6,
679  {
680  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
681  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
682  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
683  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
684  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
685  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
686  }
687 };
688 
689 #define log_printf(level, format, args...) \
690 do { \
691  instance->totemsrp_log_printf ( \
692  level, instance->totemsrp_subsys_id, \
693  __FUNCTION__, __FILE__, __LINE__, \
694  format, ##args); \
695 } while (0);
696 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
697 do { \
698  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
699  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
700  instance->totemsrp_log_printf ( \
701  level, instance->totemsrp_subsys_id, \
702  __FUNCTION__, __FILE__, __LINE__, \
703  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
704  } while(0)
705 
706 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
707 {
708  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
709  return gather_state_from_desc[gsfrom];
710  }
711  else {
712  return "UNKNOWN";
713  }
714 }
715 
716 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
717 {
718  memset (instance, 0, sizeof (struct totemsrp_instance));
719 
720  qb_list_init (&instance->token_callback_received_listhead);
721 
722  qb_list_init (&instance->token_callback_sent_listhead);
723 
724  instance->my_received_flg = 1;
725 
726  instance->my_token_seq = SEQNO_START_TOKEN - 1;
727 
729 
730  instance->set_aru = -1;
731 
732  instance->my_aru = SEQNO_START_MSG;
733 
735 
737 
738  instance->orf_token_discard = 0;
739 
740  instance->originated_orf_token = 0;
741 
742  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
743 
744  instance->waiting_trans_ack = 1;
745 }
746 
747 static int pause_flush (struct totemsrp_instance *instance)
748 {
749  uint64_t now_msec;
750  uint64_t timestamp_msec;
751  int res = 0;
752 
753  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
754  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
755 
756  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
758  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
759  /*
760  * -1 indicates an error from recvmsg
761  */
762  do {
764  } while (res == -1);
765  }
766  return (res);
767 }
768 
769 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
770 {
771  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
772  uint32_t time_now;
773  unsigned long long nano_secs = qb_util_nano_current_get ();
774 
775  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
776 
777  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
778  /* incr latest token the index */
779  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780  instance->stats.latest_token = 0;
781  else
782  instance->stats.latest_token++;
783 
784  if (instance->stats.earliest_token == instance->stats.latest_token) {
785  /* we have filled up the array, start overwriting */
786  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787  instance->stats.earliest_token = 0;
788  else
789  instance->stats.earliest_token++;
790 
791  instance->stats.token[instance->stats.earliest_token].rx = 0;
792  instance->stats.token[instance->stats.earliest_token].tx = 0;
793  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794  }
795 
796  instance->stats.token[instance->stats.latest_token].rx = time_now;
797  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798  } else {
799  instance->stats.token[instance->stats.latest_token].tx = time_now;
800  }
801  return 0;
802 }
803 
804 static void totempg_mtu_changed(void *context, int net_mtu)
805 {
806  struct totemsrp_instance *instance = context;
807 
808  instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809 
811  "Net MTU changed to %d, new value is %d",
812  net_mtu, instance->totem_config->net_mtu);
813 }
814 
815 /*
816  * Exported interfaces
817  */
819  qb_loop_t *poll_handle,
820  void **srp_context,
821  struct totem_config *totem_config,
822  totempg_stats_t *stats,
823 
824  void (*deliver_fn) (
825  unsigned int nodeid,
826  const void *msg,
827  unsigned int msg_len,
828  int endian_conversion_required),
829 
830  void (*confchg_fn) (
831  enum totem_configuration_type configuration_type,
832  const unsigned int *member_list, size_t member_list_entries,
833  const unsigned int *left_list, size_t left_list_entries,
834  const unsigned int *joined_list, size_t joined_list_entries,
835  const struct memb_ring_id *ring_id),
836  void (*waiting_trans_ack_cb_fn) (
837  int waiting_trans_ack))
838 {
839  struct totemsrp_instance *instance;
840  int res;
841 
842  instance = malloc (sizeof (struct totemsrp_instance));
843  if (instance == NULL) {
844  goto error_exit;
845  }
846 
847  totemsrp_instance_initialize (instance);
848 
849  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
850  instance->totemsrp_waiting_trans_ack_cb_fn (1);
851 
852  stats->srp = &instance->stats;
853  instance->stats.latest_token = 0;
854  instance->stats.earliest_token = 0;
855 
856  instance->totem_config = totem_config;
857 
858  /*
859  * Configure logging
860  */
869 
870  /*
871  * Configure totem store and load functions
872  */
874  instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
875 
876  /*
877  * Initialize local variables for totemsrp
878  */
879  totemip_copy (&instance->mcast_address, &totem_config->interfaces[instance->lowest_active_if].mcast_addr);
880 
881  /*
882  * Display totem configuration
883  */
885  "Token Timeout (%d ms) retransmit timeout (%d ms)",
886  totem_config->token_timeout, totem_config->token_retransmit_timeout);
887  if (totem_config->token_warning) {
888  uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
890  "Token warning every %d ms (%d%% of Token Timeout)",
891  token_warning_ms, totem_config->token_warning);
892  if (token_warning_ms < totem_config->token_retransmit_timeout)
894  "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895  "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
896  token_warning_ms, totem_config->token_retransmit_timeout);
897  } else {
899  "Token warnings disabled");
900  }
902  "token hold (%d ms) retransmits before loss (%d retrans)",
903  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
905  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
906  totem_config->join_timeout,
907  totem_config->send_join_timeout,
908  totem_config->consensus_timeout,
909 
910  totem_config->merge_timeout);
912  "downcheck (%d ms) fail to recv const (%d msgs)",
913  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
915  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916 
918  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
919  totem_config->window_size, totem_config->max_messages);
920 
922  "missed count const (%d messages)",
923  totem_config->miss_count_const);
924 
926  "send threads (%d threads)", totem_config->threads);
927 
929  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931  "max_network_delay (%d ms)", totem_config->max_network_delay);
932 
933 
934  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935  sizeof (struct message_item), instance->threaded_mode_enabled);
936 
937  sq_init (&instance->regular_sort_queue,
938  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
939 
940  sq_init (&instance->recovery_sort_queue,
941  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
942 
943  instance->totemsrp_poll_handle = poll_handle;
944 
945  instance->totemsrp_deliver_fn = deliver_fn;
946 
947  instance->totemsrp_confchg_fn = confchg_fn;
948  instance->use_heartbeat = 1;
949 
950  timer_function_pause_timeout (instance);
951 
952  if ( totem_config->heartbeat_failures_allowed == 0 ) {
954  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955  instance->use_heartbeat = 0;
956  }
957 
958  if (instance->use_heartbeat) {
959  instance->heartbeat_timeout
960  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
961  + totem_config->max_network_delay;
962 
963  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966  instance->heartbeat_timeout,
967  totem_config->token_timeout);
969  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972  instance->use_heartbeat = 0;
973  }
974  else {
976  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977  }
978  }
979 
980  res = totemnet_initialize (
981  poll_handle,
982  &instance->totemnet_context,
983  totem_config,
984  stats->srp,
985  instance,
986  main_deliver_fn,
987  main_iface_change_fn,
988  totempg_mtu_changed,
989  target_set_completed);
990  if (res == -1) {
991  goto error_exit;
992  }
993 
994  instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995 
996  /*
997  * Must have net_mtu adjusted by totemnet_initialize first
998  */
999  cs_queue_init (&instance->new_message_queue,
1001  sizeof (struct message_item), instance->threaded_mode_enabled);
1002 
1003  cs_queue_init (&instance->new_message_queue_trans,
1005  sizeof (struct message_item), instance->threaded_mode_enabled);
1006 
1008  &instance->token_recv_event_handle,
1010  0,
1011  token_event_stats_collector,
1012  instance);
1014  &instance->token_sent_event_handle,
1016  0,
1017  token_event_stats_collector,
1018  instance);
1019  *srp_context = instance;
1020  return (0);
1021 
1022 error_exit:
1023  return (-1);
1024 }
1025 
1027  void *srp_context)
1028 {
1029  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030 
1031  memb_leave_message_send (instance);
1032  totemnet_finalize (instance->totemnet_context);
1033  cs_queue_free (&instance->new_message_queue);
1034  cs_queue_free (&instance->new_message_queue_trans);
1035  cs_queue_free (&instance->retrans_message_queue);
1036  sq_free (&instance->regular_sort_queue);
1037  sq_free (&instance->recovery_sort_queue);
1038  free (instance);
1039 }
1040 
1041 /*
1042  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1043  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1044  * function.
1045  *
1046  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1047  * and if interface was not found, -1 is returned.
1048  */
1050  void *srp_context,
1051  unsigned int nodeid,
1052  unsigned int *interface_id,
1053  struct totem_ip_address *interfaces,
1054  unsigned int interfaces_size,
1055  char ***status,
1056  unsigned int *iface_count)
1057 {
1058  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1059  struct totem_ip_address *iface_ptr = interfaces;
1060  int res = 0;
1061  int i,n;
1062  int num_ifs = 0;
1063 
1064  memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1065  *iface_count = INTERFACE_MAX;
1066 
1067  for (i=0; i<INTERFACE_MAX; i++) {
1068  for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1069  if (instance->totem_config->interfaces[i].configured &&
1070  instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1071  memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1072  interface_id[num_ifs] = i;
1073  iface_ptr++;
1074  if (++num_ifs > interfaces_size) {
1075  res = -2;
1076  break;
1077  }
1078  }
1079  }
1080  }
1081 
1082  totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1083  *iface_count = num_ifs;
1084  return (res);
1085 }
1086 
1088  void *srp_context,
1089  const char *cipher_type,
1090  const char *hash_type)
1091 {
1092  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1093  int res;
1094 
1095  res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1096 
1097  return (res);
1098 }
1099 
1100 
1102  void *srp_context)
1103 {
1104  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1105  unsigned int res;
1106 
1107  res = instance->my_id.nodeid;
1108 
1109  return (res);
1110 }
1111 
1113  void *srp_context)
1114 {
1115  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1116  int res;
1117 
1118  res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1119 
1120  return (res);
1121 }
1122 
1123 
1124 /*
1125  * Set operations for use by the membership algorithm
1126  */
1127 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1128 {
1129  if (a->nodeid == b->nodeid) {
1130  return 1;
1131  }
1132  return 0;
1133 }
1134 
1135 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1136 {
1137  dest->nodeid = src->nodeid;
1138 }
1139 
1140 static void srp_addr_to_nodeid (
1141  struct totemsrp_instance *instance,
1142  unsigned int *nodeid_out,
1143  struct srp_addr *srp_addr_in,
1144  unsigned int entries)
1145 {
1146  unsigned int i;
1147 
1148  for (i = 0; i < entries; i++) {
1149  nodeid_out[i] = srp_addr_in[i].nodeid;
1150  }
1151 }
1152 
1153 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1154 {
1155  out->nodeid = swab32 (in->nodeid);
1156 }
1157 
1158 static void memb_consensus_reset (struct totemsrp_instance *instance)
1159 {
1160  instance->consensus_list_entries = 0;
1161 }
1162 
1163 static void memb_set_subtract (
1164  struct srp_addr *out_list, int *out_list_entries,
1165  struct srp_addr *one_list, int one_list_entries,
1166  struct srp_addr *two_list, int two_list_entries)
1167 {
1168  int found = 0;
1169  int i;
1170  int j;
1171 
1172  *out_list_entries = 0;
1173 
1174  for (i = 0; i < one_list_entries; i++) {
1175  for (j = 0; j < two_list_entries; j++) {
1176  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1177  found = 1;
1178  break;
1179  }
1180  }
1181  if (found == 0) {
1182  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1183  *out_list_entries = *out_list_entries + 1;
1184  }
1185  found = 0;
1186  }
1187 }
1188 
1189 /*
1190  * Set consensus for a specific processor
1191  */
1192 static void memb_consensus_set (
1193  struct totemsrp_instance *instance,
1194  const struct srp_addr *addr)
1195 {
1196  int found = 0;
1197  int i;
1198 
1199  for (i = 0; i < instance->consensus_list_entries; i++) {
1200  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1201  found = 1;
1202  break; /* found entry */
1203  }
1204  }
1205  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1206  instance->consensus_list[i].set = 1;
1207  if (found == 0) {
1208  instance->consensus_list_entries++;
1209  }
1210  return;
1211 }
1212 
1213 /*
1214  * Is consensus set for a specific processor
1215  */
1216 static int memb_consensus_isset (
1217  struct totemsrp_instance *instance,
1218  const struct srp_addr *addr)
1219 {
1220  int i;
1221 
1222  for (i = 0; i < instance->consensus_list_entries; i++) {
1223  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1224  return (instance->consensus_list[i].set);
1225  }
1226  }
1227  return (0);
1228 }
1229 
1230 /*
1231  * Is consensus agreed upon based upon consensus database
1232  */
1233 static int memb_consensus_agreed (
1234  struct totemsrp_instance *instance)
1235 {
1236  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1237  int token_memb_entries = 0;
1238  int agreed = 1;
1239  int i;
1240 
1241  memb_set_subtract (token_memb, &token_memb_entries,
1242  instance->my_proc_list, instance->my_proc_list_entries,
1243  instance->my_failed_list, instance->my_failed_list_entries);
1244 
1245  for (i = 0; i < token_memb_entries; i++) {
1246  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1247  agreed = 0;
1248  break;
1249  }
1250  }
1251 
1252  if (agreed && instance->failed_to_recv == 1) {
1253  /*
1254  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1255  * will create single ring anyway.
1256  */
1257 
1258  return (agreed);
1259  }
1260 
1261  assert (token_memb_entries >= 1);
1262 
1263  return (agreed);
1264 }
1265 
1266 static void memb_consensus_notset (
1267  struct totemsrp_instance *instance,
1268  struct srp_addr *no_consensus_list,
1269  int *no_consensus_list_entries,
1270  struct srp_addr *comparison_list,
1271  int comparison_list_entries)
1272 {
1273  int i;
1274 
1275  *no_consensus_list_entries = 0;
1276 
1277  for (i = 0; i < instance->my_proc_list_entries; i++) {
1278  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1279  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1280  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1281  }
1282  }
1283 }
1284 
1285 /*
1286  * Is set1 equal to set2 Entries can be in different orders
1287  */
1288 static int memb_set_equal (
1289  struct srp_addr *set1, int set1_entries,
1290  struct srp_addr *set2, int set2_entries)
1291 {
1292  int i;
1293  int j;
1294 
1295  int found = 0;
1296 
1297  if (set1_entries != set2_entries) {
1298  return (0);
1299  }
1300  for (i = 0; i < set2_entries; i++) {
1301  for (j = 0; j < set1_entries; j++) {
1302  if (srp_addr_equal (&set1[j], &set2[i])) {
1303  found = 1;
1304  break;
1305  }
1306  }
1307  if (found == 0) {
1308  return (0);
1309  }
1310  found = 0;
1311  }
1312  return (1);
1313 }
1314 
1315 /*
1316  * Is subset fully contained in fullset
1317  */
1318 static int memb_set_subset (
1319  const struct srp_addr *subset, int subset_entries,
1320  const struct srp_addr *fullset, int fullset_entries)
1321 {
1322  int i;
1323  int j;
1324  int found = 0;
1325 
1326  if (subset_entries > fullset_entries) {
1327  return (0);
1328  }
1329  for (i = 0; i < subset_entries; i++) {
1330  for (j = 0; j < fullset_entries; j++) {
1331  if (srp_addr_equal (&subset[i], &fullset[j])) {
1332  found = 1;
1333  }
1334  }
1335  if (found == 0) {
1336  return (0);
1337  }
1338  found = 0;
1339  }
1340  return (1);
1341 }
1342 /*
1343  * merge subset into fullset taking care not to add duplicates
1344  */
1345 static void memb_set_merge (
1346  const struct srp_addr *subset, int subset_entries,
1347  struct srp_addr *fullset, int *fullset_entries)
1348 {
1349  int found = 0;
1350  int i;
1351  int j;
1352 
1353  for (i = 0; i < subset_entries; i++) {
1354  for (j = 0; j < *fullset_entries; j++) {
1355  if (srp_addr_equal (&fullset[j], &subset[i])) {
1356  found = 1;
1357  break;
1358  }
1359  }
1360  if (found == 0) {
1361  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1362  *fullset_entries = *fullset_entries + 1;
1363  }
1364  found = 0;
1365  }
1366  return;
1367 }
1368 
1369 static void memb_set_and_with_ring_id (
1370  struct srp_addr *set1,
1371  struct memb_ring_id *set1_ring_ids,
1372  int set1_entries,
1373  struct srp_addr *set2,
1374  int set2_entries,
1375  struct memb_ring_id *old_ring_id,
1376  struct srp_addr *and,
1377  int *and_entries)
1378 {
1379  int i;
1380  int j;
1381  int found = 0;
1382 
1383  *and_entries = 0;
1384 
1385  for (i = 0; i < set2_entries; i++) {
1386  for (j = 0; j < set1_entries; j++) {
1387  if (srp_addr_equal (&set1[j], &set2[i])) {
1388  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1389  found = 1;
1390  }
1391  break;
1392  }
1393  }
1394  if (found) {
1395  srp_addr_copy (&and[*and_entries], &set1[j]);
1396  *and_entries = *and_entries + 1;
1397  }
1398  found = 0;
1399  }
1400  return;
1401 }
1402 
1403 static void memb_set_log(
1404  struct totemsrp_instance *instance,
1405  int level,
1406  const char *string,
1407  struct srp_addr *list,
1408  int list_entries)
1409 {
1410  char int_buf[32];
1411  char list_str[512];
1412  int i;
1413 
1414  memset(list_str, 0, sizeof(list_str));
1415 
1416  for (i = 0; i < list_entries; i++) {
1417  if (i == 0) {
1418  snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1419  } else {
1420  snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1421  }
1422 
1423  if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1424  break ;
1425  }
1426  strcat(list_str, int_buf);
1427  }
1428 
1429  log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1430 }
1431 
1432 static void my_leave_memb_clear(
1433  struct totemsrp_instance *instance)
1434 {
1435  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1436  instance->my_leave_memb_entries = 0;
1437 }
1438 
1439 static unsigned int my_leave_memb_match(
1440  struct totemsrp_instance *instance,
1441  unsigned int nodeid)
1442 {
1443  int i;
1444  unsigned int ret = 0;
1445 
1446  for (i = 0; i < instance->my_leave_memb_entries; i++){
1447  if (instance->my_leave_memb_list[i] == nodeid){
1448  ret = nodeid;
1449  break;
1450  }
1451  }
1452  return ret;
1453 }
1454 
1455 static void my_leave_memb_set(
1456  struct totemsrp_instance *instance,
1457  unsigned int nodeid)
1458 {
1459  int i, found = 0;
1460  for (i = 0; i < instance->my_leave_memb_entries; i++){
1461  if (instance->my_leave_memb_list[i] == nodeid){
1462  found = 1;
1463  break;
1464  }
1465  }
1466  if (found == 1) {
1467  return;
1468  }
1469  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1470  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1471  instance->my_leave_memb_entries++;
1472  } else {
1474  "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1475  }
1476 }
1477 
1478 
1479 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1480 {
1481  assert (instance != NULL);
1482  return totemnet_buffer_alloc (instance->totemnet_context);
1483 }
1484 
1485 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1486 {
1487  assert (instance != NULL);
1488  totemnet_buffer_release (instance->totemnet_context, ptr);
1489 }
1490 
1491 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1492 {
1493  int32_t res;
1494 
1495  qb_loop_timer_del (instance->totemsrp_poll_handle,
1497  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1498  QB_LOOP_MED,
1499  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1500  (void *)instance,
1501  timer_function_token_retransmit_timeout,
1503  if (res != 0) {
1504  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1505  }
1506 
1507 }
1508 
1509 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1510 {
1511  int32_t res;
1512 
1513  if (instance->my_merge_detect_timeout_outstanding == 0) {
1514  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1515  QB_LOOP_MED,
1516  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1517  (void *)instance,
1518  timer_function_merge_detect_timeout,
1519  &instance->timer_merge_detect_timeout);
1520  if (res != 0) {
1521  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1522  }
1523 
1524  instance->my_merge_detect_timeout_outstanding = 1;
1525  }
1526 }
1527 
1528 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1529 {
1530  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1532 }
1533 
1534 /*
1535  * ring_state_* is used to save and restore the sort queue
1536  * state when a recovery operation fails (and enters gather)
1537  */
1538 static void old_ring_state_save (struct totemsrp_instance *instance)
1539 {
1540  if (instance->old_ring_state_saved == 0) {
1541  instance->old_ring_state_saved = 1;
1542  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1543  sizeof (struct memb_ring_id));
1544  instance->old_ring_state_aru = instance->my_aru;
1547  "Saving state aru %x high seq received %x",
1548  instance->my_aru, instance->my_high_seq_received);
1549  }
1550 }
1551 
1552 static void old_ring_state_restore (struct totemsrp_instance *instance)
1553 {
1554  instance->my_aru = instance->old_ring_state_aru;
1557  "Restoring instance->my_aru %x my high seq received %x",
1558  instance->my_aru, instance->my_high_seq_received);
1559 }
1560 
1561 static void old_ring_state_reset (struct totemsrp_instance *instance)
1562 {
1564  "Resetting old ring state");
1565  instance->old_ring_state_saved = 0;
1566 }
1567 
1568 static void reset_pause_timeout (struct totemsrp_instance *instance)
1569 {
1570  int32_t res;
1571 
1572  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1573  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1574  QB_LOOP_MED,
1575  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1576  (void *)instance,
1577  timer_function_pause_timeout,
1578  &instance->timer_pause_timeout);
1579  if (res != 0) {
1580  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1581  }
1582 }
1583 
1584 static void reset_token_warning (struct totemsrp_instance *instance) {
1585  int32_t res;
1586 
1587  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1588  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1589  QB_LOOP_MED,
1590  instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1591  (void *)instance,
1592  timer_function_orf_token_warning,
1593  &instance->timer_orf_token_warning);
1594  if (res != 0) {
1595  log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1596  }
1597 }
1598 
1599 static void reset_token_timeout (struct totemsrp_instance *instance) {
1600  int32_t res;
1601 
1602  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1603  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1604  QB_LOOP_MED,
1605  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1606  (void *)instance,
1607  timer_function_orf_token_timeout,
1608  &instance->timer_orf_token_timeout);
1609  if (res != 0) {
1610  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1611  }
1612 
1613  if (instance->totem_config->token_warning)
1614  reset_token_warning(instance);
1615 }
1616 
1617 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1618  int32_t res;
1619 
1620  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1621  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1622  QB_LOOP_MED,
1623  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1624  (void *)instance,
1625  timer_function_heartbeat_timeout,
1626  &instance->timer_heartbeat_timeout);
1627  if (res != 0) {
1628  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1629  }
1630 }
1631 
1632 
1633 static void cancel_token_warning (struct totemsrp_instance *instance) {
1634  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1635 }
1636 
1637 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1638  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1639 
1640  if (instance->totem_config->token_warning)
1641  cancel_token_warning(instance);
1642 }
1643 
1644 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1645  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1646 }
1647 
1648 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1649 {
1650  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1651 }
1652 
1653 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1654 {
1655  int32_t res;
1656 
1657  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1658  QB_LOOP_MED,
1659  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1660  (void *)instance,
1661  timer_function_token_hold_retransmit_timeout,
1663  if (res != 0) {
1664  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1665  }
1666 }
1667 
1668 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1669 {
1670  qb_loop_timer_del (instance->totemsrp_poll_handle,
1672 }
1673 
1674 static void memb_state_consensus_timeout_expired (
1675  struct totemsrp_instance *instance)
1676 {
1677  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1678  int no_consensus_list_entries;
1679 
1680  instance->stats.consensus_timeouts++;
1681  if (memb_consensus_agreed (instance)) {
1682  memb_consensus_reset (instance);
1683 
1684  memb_consensus_set (instance, &instance->my_id);
1685 
1686  reset_token_timeout (instance); // REVIEWED
1687  } else {
1688  memb_consensus_notset (
1689  instance,
1690  no_consensus_list,
1691  &no_consensus_list_entries,
1692  instance->my_proc_list,
1693  instance->my_proc_list_entries);
1694 
1695  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1696  instance->my_failed_list, &instance->my_failed_list_entries);
1697  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1698  }
1699 }
1700 
1701 static void memb_join_message_send (struct totemsrp_instance *instance);
1702 
1703 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1704 
1705 /*
1706  * Timers used for various states of the membership algorithm
1707  */
1708 static void timer_function_pause_timeout (void *data)
1709 {
1710  struct totemsrp_instance *instance = data;
1711 
1712  instance->pause_timestamp = qb_util_nano_current_get ();
1713  reset_pause_timeout (instance);
1714 }
1715 
1716 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1717 {
1718  old_ring_state_restore (instance);
1719  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1720  instance->stats.recovery_token_lost++;
1721 }
1722 
1723 static void timer_function_orf_token_warning (void *data)
1724 {
1725  struct totemsrp_instance *instance = data;
1726  uint64_t tv_diff;
1727 
1728  /* need to protect against the case where token_warning is set to 0 dynamically */
1729  if (instance->totem_config->token_warning) {
1730  tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1731  instance->stats.token[instance->stats.latest_token].rx;
1733  "Token has not been received in %d ms ", (unsigned int) tv_diff);
1734  reset_token_warning(instance);
1735  } else {
1736  cancel_token_warning(instance);
1737  }
1738 }
1739 
1740 static void timer_function_orf_token_timeout (void *data)
1741 {
1742  struct totemsrp_instance *instance = data;
1743 
1744  switch (instance->memb_state) {
1747  "The token was lost in the OPERATIONAL state.");
1749  "A processor failed, forming new configuration.");
1751  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1752  instance->stats.operational_token_lost++;
1753  break;
1754 
1755  case MEMB_STATE_GATHER:
1757  "The consensus timeout expired.");
1758  memb_state_consensus_timeout_expired (instance);
1759  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1760  instance->stats.gather_token_lost++;
1761  break;
1762 
1763  case MEMB_STATE_COMMIT:
1765  "The token was lost in the COMMIT state.");
1766  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1767  instance->stats.commit_token_lost++;
1768  break;
1769 
1770  case MEMB_STATE_RECOVERY:
1772  "The token was lost in the RECOVERY state.");
1773  memb_recovery_state_token_loss (instance);
1774  instance->orf_token_discard = 1;
1775  break;
1776  }
1777 }
1778 
1779 static void timer_function_heartbeat_timeout (void *data)
1780 {
1781  struct totemsrp_instance *instance = data;
1783  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1784  timer_function_orf_token_timeout(data);
1785 }
1786 
1787 static void memb_timer_function_state_gather (void *data)
1788 {
1789  struct totemsrp_instance *instance = data;
1790  int32_t res;
1791 
1792  switch (instance->memb_state) {
1794  case MEMB_STATE_RECOVERY:
1795  assert (0); /* this should never happen */
1796  break;
1797  case MEMB_STATE_GATHER:
1798  case MEMB_STATE_COMMIT:
1799  memb_join_message_send (instance);
1800 
1801  /*
1802  * Restart the join timeout
1803  `*/
1804  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1805 
1806  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1807  QB_LOOP_MED,
1808  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1809  (void *)instance,
1810  memb_timer_function_state_gather,
1811  &instance->memb_timer_state_gather_join_timeout);
1812 
1813  if (res != 0) {
1814  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1815  }
1816  break;
1817  }
1818 }
1819 
1820 static void memb_timer_function_gather_consensus_timeout (void *data)
1821 {
1822  struct totemsrp_instance *instance = data;
1823  memb_state_consensus_timeout_expired (instance);
1824 }
1825 
1826 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1827 {
1828  unsigned int i;
1829  struct sort_queue_item *recovery_message_item;
1830  struct sort_queue_item regular_message_item;
1831  unsigned int range = 0;
1832  int res;
1833  void *ptr;
1834  struct mcast *mcast;
1835 
1837  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1838 
1839  range = instance->my_aru - SEQNO_START_MSG;
1840  /*
1841  * Move messages from recovery to regular sort queue
1842  */
1843 // todo should i be initialized to 0 or 1 ?
1844  for (i = 1; i <= range; i++) {
1845  res = sq_item_get (&instance->recovery_sort_queue,
1846  i + SEQNO_START_MSG, &ptr);
1847  if (res != 0) {
1848  continue;
1849  }
1850  recovery_message_item = ptr;
1851 
1852  /*
1853  * Convert recovery message into regular message
1854  */
1855  mcast = recovery_message_item->mcast;
1856  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1857  /*
1858  * Message is a recovery message encapsulated
1859  * in a new ring message
1860  */
1861  regular_message_item.mcast =
1862  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1863  regular_message_item.msg_len =
1864  recovery_message_item->msg_len - sizeof (struct mcast);
1865  mcast = regular_message_item.mcast;
1866  } else {
1867  /*
1868  * TODO this case shouldn't happen
1869  */
1870  continue;
1871  }
1872 
1874  "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1875  (uint64_t)mcast->seq);
1876 
1877  /*
1878  * Only add this message to the regular sort
1879  * queue if it was originated with the same ring
1880  * id as the previous ring
1881  */
1882  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1883  sizeof (struct memb_ring_id)) == 0) {
1884 
1885  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1886  if (res == 0) {
1887  sq_item_add (&instance->regular_sort_queue,
1888  &regular_message_item, mcast->seq);
1889  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1890  instance->old_ring_state_high_seq_received = mcast->seq;
1891  }
1892  }
1893  } else {
1895  "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1896  }
1897  }
1898 }
1899 
1900 /*
1901  * Change states in the state machine of the membership algorithm
1902  */
1903 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1904 {
1905  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1906  int joined_list_entries = 0;
1907  unsigned int aru_save;
1908  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1909  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1910  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1911  unsigned int left_list[PROCESSOR_COUNT_MAX];
1912  unsigned int i;
1913  unsigned int res;
1914  char left_node_msg[1024];
1915  char joined_node_msg[1024];
1916  char failed_node_msg[1024];
1917 
1918  instance->originated_orf_token = 0;
1919 
1920  memb_consensus_reset (instance);
1921 
1922  old_ring_state_reset (instance);
1923 
1924  deliver_messages_from_recovery_to_regular (instance);
1925 
1927  "Delivering to app %x to %x",
1928  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1929 
1930  aru_save = instance->my_aru;
1931  instance->my_aru = instance->old_ring_state_aru;
1932 
1933  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1934 
1935  /*
1936  * Calculate joined and left list
1937  */
1938  memb_set_subtract (instance->my_left_memb_list,
1939  &instance->my_left_memb_entries,
1940  instance->my_memb_list, instance->my_memb_entries,
1941  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1942 
1943  memb_set_subtract (joined_list, &joined_list_entries,
1944  instance->my_new_memb_list, instance->my_new_memb_entries,
1945  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1946 
1947  /*
1948  * Install new membership
1949  */
1950  instance->my_memb_entries = instance->my_new_memb_entries;
1951  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1952  sizeof (struct srp_addr) * instance->my_memb_entries);
1953  instance->last_released = 0;
1954  instance->my_set_retrans_flg = 0;
1955 
1956  /*
1957  * Deliver transitional configuration to application
1958  */
1959  srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1960  instance->my_left_memb_entries);
1961  srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1962  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1964  trans_memb_list_totemip, instance->my_trans_memb_entries,
1965  left_list, instance->my_left_memb_entries,
1966  0, 0, &instance->my_ring_id);
1967  instance->waiting_trans_ack = 1;
1968  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1969 
1970 // TODO we need to filter to ensure we only deliver those
1971 // messages which are part of instance->my_deliver_memb
1972  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1973 
1974  instance->my_aru = aru_save;
1975 
1976  /*
1977  * Deliver regular configuration to application
1978  */
1979  srp_addr_to_nodeid (instance, new_memb_list_totemip,
1980  instance->my_new_memb_list, instance->my_new_memb_entries);
1981  srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1982  joined_list_entries);
1984  new_memb_list_totemip, instance->my_new_memb_entries,
1985  0, 0,
1986  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1987 
1988  /*
1989  * The recovery sort queue now becomes the regular
1990  * sort queue. It is necessary to copy the state
1991  * into the regular sort queue.
1992  */
1993  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1994  instance->my_last_aru = SEQNO_START_MSG;
1995 
1996  /* When making my_proc_list smaller, ensure that the
1997  * now non-used entries are zero-ed out. There are some suspect
1998  * assert's that assume that there is always 2 entries in the list.
1999  * These fail when my_proc_list is reduced to 1 entry (and the
2000  * valid [0] entry is the same as the 'unused' [1] entry).
2001  */
2002  memset(instance->my_proc_list, 0,
2003  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2004 
2005  instance->my_proc_list_entries = instance->my_new_memb_entries;
2006  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2007  sizeof (struct srp_addr) * instance->my_memb_entries);
2008 
2009  instance->my_failed_list_entries = 0;
2010  /*
2011  * TODO Not exactly to spec
2012  *
2013  * At the entry to this function all messages without a gap are
2014  * deliered.
2015  *
2016  * This code throw away messages from the last gap in the sort queue
2017  * to my_high_seq_received
2018  *
2019  * What should really happen is we should deliver all messages up to
2020  * a gap, then delier the transitional configuration, then deliver
2021  * the messages between the first gap and my_high_seq_received, then
2022  * deliver a regular configuration, then deliver the regular
2023  * configuration
2024  *
2025  * Unfortunately totempg doesn't appear to like this operating mode
2026  * which needs more inspection
2027  */
2028  i = instance->my_high_seq_received + 1;
2029  do {
2030  void *ptr;
2031 
2032  i -= 1;
2033  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2034  if (i == 0) {
2035  break;
2036  }
2037  } while (res);
2038 
2039  instance->my_high_delivered = i;
2040 
2041  for (i = 0; i <= instance->my_high_delivered; i++) {
2042  void *ptr;
2043 
2044  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2045  if (res == 0) {
2046  struct sort_queue_item *regular_message;
2047 
2048  regular_message = ptr;
2049  free (regular_message->mcast);
2050  }
2051  }
2052  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2053  instance->last_released = instance->my_high_delivered;
2054 
2055  if (joined_list_entries) {
2056  int sptr = 0;
2057  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2058  for (i=0; i< joined_list_entries; i++) {
2059  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2060  }
2061  }
2062  else {
2063  joined_node_msg[0] = '\0';
2064  }
2065 
2066  if (instance->my_left_memb_entries) {
2067  int sptr = 0;
2068  int sptr2 = 0;
2069  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2070  for (i=0; i< instance->my_left_memb_entries; i++) {
2071  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2072  }
2073  for (i=0; i< instance->my_left_memb_entries; i++) {
2074  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2075  if (sptr2 == 0) {
2076  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2077  }
2078  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2079  }
2080  }
2081  if (sptr2 == 0) {
2082  failed_node_msg[0] = '\0';
2083  }
2084  }
2085  else {
2086  left_node_msg[0] = '\0';
2087  failed_node_msg[0] = '\0';
2088  }
2089 
2090  my_leave_memb_clear(instance);
2091 
2093  "entering OPERATIONAL state.");
2095  "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2096  instance->my_ring_id.rep,
2097  (uint64_t)instance->my_ring_id.seq,
2098  joined_node_msg,
2099  left_node_msg);
2100 
2101  if (strlen(failed_node_msg)) {
2103  "Failed to receive the leave message.%s",
2104  failed_node_msg);
2105  }
2106 
2107  instance->memb_state = MEMB_STATE_OPERATIONAL;
2108 
2109  instance->stats.operational_entered++;
2110  instance->stats.continuous_gather = 0;
2111 
2112  instance->my_received_flg = 1;
2113 
2114  reset_pause_timeout (instance);
2115 
2116  /*
2117  * Save ring id information from this configuration to determine
2118  * which processors are transitioning from old regular configuration
2119  * in to new regular configuration on the next configuration change
2120  */
2121  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2122  sizeof (struct memb_ring_id));
2123 
2124  return;
2125 }
2126 
2127 static void memb_state_gather_enter (
2128  struct totemsrp_instance *instance,
2129  enum gather_state_from gather_from)
2130 {
2131  int32_t res;
2132 
2133  instance->orf_token_discard = 1;
2134 
2135  instance->originated_orf_token = 0;
2136 
2137  memb_set_merge (
2138  &instance->my_id, 1,
2139  instance->my_proc_list, &instance->my_proc_list_entries);
2140 
2141  memb_join_message_send (instance);
2142 
2143  /*
2144  * Restart the join timeout
2145  */
2146  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2147 
2148  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2149  QB_LOOP_MED,
2150  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2151  (void *)instance,
2152  memb_timer_function_state_gather,
2153  &instance->memb_timer_state_gather_join_timeout);
2154  if (res != 0) {
2155  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2156  }
2157 
2158  /*
2159  * Restart the consensus timeout
2160  */
2161  qb_loop_timer_del (instance->totemsrp_poll_handle,
2162  instance->memb_timer_state_gather_consensus_timeout);
2163 
2164  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2165  QB_LOOP_MED,
2166  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2167  (void *)instance,
2168  memb_timer_function_gather_consensus_timeout,
2169  &instance->memb_timer_state_gather_consensus_timeout);
2170  if (res != 0) {
2171  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2172  }
2173 
2174  /*
2175  * Cancel the token loss and token retransmission timeouts
2176  */
2177  cancel_token_retransmit_timeout (instance); // REVIEWED
2178  cancel_token_timeout (instance); // REVIEWED
2179  cancel_merge_detect_timeout (instance);
2180 
2181  memb_consensus_reset (instance);
2182 
2183  memb_consensus_set (instance, &instance->my_id);
2184 
2185  log_printf (instance->totemsrp_log_level_debug,
2186  "entering GATHER state from %d(%s).",
2187  gather_from, gsfrom_to_msg(gather_from));
2188 
2189  instance->memb_state = MEMB_STATE_GATHER;
2190  instance->stats.gather_entered++;
2191 
2192  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2193  /*
2194  * State 3 means gather, so we are continuously gathering.
2195  */
2196  instance->stats.continuous_gather++;
2197  }
2198 
2199  return;
2200 }
2201 
2202 static void timer_function_token_retransmit_timeout (void *data);
2203 
2204 static void target_set_completed (
2205  void *context)
2206 {
2207  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2208 
2209  memb_state_commit_token_send (instance);
2210 
2211 }
2212 
2213 static void memb_state_commit_enter (
2214  struct totemsrp_instance *instance)
2215 {
2216  old_ring_state_save (instance);
2217 
2218  memb_state_commit_token_update (instance);
2219 
2220  memb_state_commit_token_target_set (instance);
2221 
2222  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2223 
2225 
2226  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2227 
2229 
2230  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2231 
2232  instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2233 
2234  instance->token_ring_id_seq = instance->my_ring_id.seq;
2235 
2237  "entering COMMIT state.");
2238 
2239  instance->memb_state = MEMB_STATE_COMMIT;
2240  reset_token_retransmit_timeout (instance); // REVIEWED
2241  reset_token_timeout (instance); // REVIEWED
2242 
2243  instance->stats.commit_entered++;
2244  instance->stats.continuous_gather = 0;
2245 
2246  /*
2247  * reset all flow control variables since we are starting a new ring
2248  */
2249  instance->my_trc = 0;
2250  instance->my_pbl = 0;
2251  instance->my_cbl = 0;
2252  /*
2253  * commit token sent after callback that token target has been set
2254  */
2255 }
2256 
2257 static void memb_state_recovery_enter (
2258  struct totemsrp_instance *instance,
2259  struct memb_commit_token *commit_token)
2260 {
2261  int i;
2262  int local_received_flg = 1;
2263  unsigned int low_ring_aru;
2264  unsigned int range = 0;
2265  unsigned int messages_originated = 0;
2266  const struct srp_addr *addr;
2267  struct memb_commit_token_memb_entry *memb_list;
2268  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2269 
2270  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2271  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2272 
2274  "entering RECOVERY state.");
2275 
2276  instance->orf_token_discard = 0;
2277 
2278  instance->my_high_ring_delivered = 0;
2279 
2280  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2281  cs_queue_reinit (&instance->retrans_message_queue);
2282 
2283  low_ring_aru = instance->old_ring_state_high_seq_received;
2284 
2285  memb_state_commit_token_send_recovery (instance, commit_token);
2286 
2287  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2288 
2289  /*
2290  * Build regular configuration
2291  */
2293  instance->totemnet_context,
2294  commit_token->addr_entries);
2295 
2296  /*
2297  * Build transitional configuration
2298  */
2299  for (i = 0; i < instance->my_new_memb_entries; i++) {
2300  memcpy (&my_new_memb_ring_id_list[i],
2301  &memb_list[i].ring_id,
2302  sizeof (struct memb_ring_id));
2303  }
2304  memb_set_and_with_ring_id (
2305  instance->my_new_memb_list,
2306  my_new_memb_ring_id_list,
2307  instance->my_new_memb_entries,
2308  instance->my_memb_list,
2309  instance->my_memb_entries,
2310  &instance->my_old_ring_id,
2311  instance->my_trans_memb_list,
2312  &instance->my_trans_memb_entries);
2313 
2314  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2316  "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2317  }
2318  for (i = 0; i < instance->my_new_memb_entries; i++) {
2320  "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2322  "previous ringid (" CS_PRI_RING_ID ")",
2323  memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2324 
2326  "aru %x high delivered %x received flag %d",
2327  memb_list[i].aru,
2328  memb_list[i].high_delivered,
2329  memb_list[i].received_flg);
2330 
2331  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2332  }
2333  /*
2334  * Determine if any received flag is false
2335  */
2336  for (i = 0; i < commit_token->addr_entries; i++) {
2337  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2338  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2339 
2340  memb_list[i].received_flg == 0) {
2341  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2342  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2343  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2344  local_received_flg = 0;
2345  break;
2346  }
2347  }
2348  if (local_received_flg == 1) {
2349  goto no_originate;
2350  } /* Else originate messages if we should */
2351 
2352  /*
2353  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2354  */
2355  for (i = 0; i < commit_token->addr_entries; i++) {
2356  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2357  instance->my_deliver_memb_list,
2358  instance->my_deliver_memb_entries) &&
2359 
2360  memcmp (&instance->my_old_ring_id,
2361  &memb_list[i].ring_id,
2362  sizeof (struct memb_ring_id)) == 0) {
2363 
2364  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2365 
2366  low_ring_aru = memb_list[i].aru;
2367  }
2368  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2369  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2370  }
2371  }
2372  }
2373 
2374  /*
2375  * Copy all old ring messages to instance->retrans_message_queue
2376  */
2377  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2378  if (range == 0) {
2379  /*
2380  * No messages to copy
2381  */
2382  goto no_originate;
2383  }
2384  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2385 
2387  "copying all old ring messages from %x-%x.",
2388  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2389 
2390  for (i = 1; i <= range; i++) {
2392  struct message_item message_item;
2393  void *ptr;
2394  int res;
2395 
2396  res = sq_item_get (&instance->regular_sort_queue,
2397  low_ring_aru + i, &ptr);
2398  if (res != 0) {
2399  continue;
2400  }
2401  sort_queue_item = ptr;
2402  messages_originated++;
2403  memset (&message_item, 0, sizeof (struct message_item));
2404  // TODO LEAK
2405  message_item.mcast = totemsrp_buffer_alloc (instance);
2406  assert (message_item.mcast);
2407  message_item.mcast->header.magic = TOTEM_MH_MAGIC;
2408  message_item.mcast->header.version = TOTEM_MH_VERSION;
2409  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2410  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2412 
2413  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2414  assert (message_item.mcast->header.nodeid);
2415  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2416  sizeof (struct memb_ring_id));
2417  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2418  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2419  sort_queue_item->mcast,
2420  sort_queue_item->msg_len);
2421  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2422  }
2424  "Originated %d messages in RECOVERY.", messages_originated);
2425  goto originated;
2426 
2427 no_originate:
2429  "Did not need to originate any messages in recovery.");
2430 
2431 originated:
2432  instance->my_aru = SEQNO_START_MSG;
2433  instance->my_aru_count = 0;
2434  instance->my_seq_unchanged = 0;
2436  instance->my_install_seq = SEQNO_START_MSG;
2437  instance->last_released = SEQNO_START_MSG;
2438 
2439  reset_token_timeout (instance); // REVIEWED
2440  reset_token_retransmit_timeout (instance); // REVIEWED
2441 
2442  instance->memb_state = MEMB_STATE_RECOVERY;
2443  instance->stats.recovery_entered++;
2444  instance->stats.continuous_gather = 0;
2445 
2446  return;
2447 }
2448 
2449 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2450 {
2451  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2452 
2453  token_hold_cancel_send (instance);
2454 
2455  return;
2456 }
2457 
2459  void *srp_context,
2460  struct iovec *iovec,
2461  unsigned int iov_len,
2462  int guarantee)
2463 {
2464  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2465  int i;
2466  struct message_item message_item;
2467  char *addr;
2468  unsigned int addr_idx;
2469  struct cs_queue *queue_use;
2470 
2471  if (instance->waiting_trans_ack) {
2472  queue_use = &instance->new_message_queue_trans;
2473  } else {
2474  queue_use = &instance->new_message_queue;
2475  }
2476 
2477  if (cs_queue_is_full (queue_use)) {
2478  log_printf (instance->totemsrp_log_level_debug, "queue full");
2479  return (-1);
2480  }
2481 
2482  memset (&message_item, 0, sizeof (struct message_item));
2483 
2484  /*
2485  * Allocate pending item
2486  */
2487  message_item.mcast = totemsrp_buffer_alloc (instance);
2488  if (message_item.mcast == 0) {
2489  goto error_mcast;
2490  }
2491 
2492  /*
2493  * Set mcast header
2494  */
2495  memset(message_item.mcast, 0, sizeof (struct mcast));
2496  message_item.mcast->header.magic = TOTEM_MH_MAGIC;
2497  message_item.mcast->header.version = TOTEM_MH_VERSION;
2498  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2500 
2501  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2502  assert (message_item.mcast->header.nodeid);
2503 
2504  message_item.mcast->guarantee = guarantee;
2505  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2506 
2507  addr = (char *)message_item.mcast;
2508  addr_idx = sizeof (struct mcast);
2509  for (i = 0; i < iov_len; i++) {
2510  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2511  addr_idx += iovec[i].iov_len;
2512  }
2513 
2514  message_item.msg_len = addr_idx;
2515 
2516  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2517  instance->stats.mcast_tx++;
2518  cs_queue_item_add (queue_use, &message_item);
2519 
2520  return (0);
2521 
2522 error_mcast:
2523  return (-1);
2524 }
2525 
2526 /*
2527  * Determine if there is room to queue a new message
2528  */
2529 int totemsrp_avail (void *srp_context)
2530 {
2531  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2532  int avail;
2533  struct cs_queue *queue_use;
2534 
2535  if (instance->waiting_trans_ack) {
2536  queue_use = &instance->new_message_queue_trans;
2537  } else {
2538  queue_use = &instance->new_message_queue;
2539  }
2540  cs_queue_avail (queue_use, &avail);
2541 
2542  return (avail);
2543 }
2544 
2545 /*
2546  * ORF Token Management
2547  */
2548 /*
2549  * Recast message to mcast group if it is available
2550  */
2551 static int orf_token_remcast (
2552  struct totemsrp_instance *instance,
2553  int seq)
2554 {
2556  int res;
2557  void *ptr;
2558 
2559  struct sq *sort_queue;
2560 
2561  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2562  sort_queue = &instance->recovery_sort_queue;
2563  } else {
2564  sort_queue = &instance->regular_sort_queue;
2565  }
2566 
2567  res = sq_in_range (sort_queue, seq);
2568  if (res == 0) {
2569  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2570  return (-1);
2571  }
2572 
2573  /*
2574  * Get RTR item at seq, if not available, return
2575  */
2576  res = sq_item_get (sort_queue, seq, &ptr);
2577  if (res != 0) {
2578  return -1;
2579  }
2580 
2581  sort_queue_item = ptr;
2582 
2584  instance->totemnet_context,
2585  sort_queue_item->mcast,
2586  sort_queue_item->msg_len);
2587 
2588  return (0);
2589 }
2590 
2591 
2592 /*
2593  * Free all freeable messages from ring
2594  */
2595 static void messages_free (
2596  struct totemsrp_instance *instance,
2597  unsigned int token_aru)
2598 {
2599  struct sort_queue_item *regular_message;
2600  unsigned int i;
2601  int res;
2602  int log_release = 0;
2603  unsigned int release_to;
2604  unsigned int range = 0;
2605 
2606  release_to = token_aru;
2607  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2608  release_to = instance->my_last_aru;
2609  }
2610  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2611  release_to = instance->my_high_delivered;
2612  }
2613 
2614  /*
2615  * Ensure we dont try release before an already released point
2616  */
2617  if (sq_lt_compare (release_to, instance->last_released)) {
2618  return;
2619  }
2620 
2621  range = release_to - instance->last_released;
2622  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2623 
2624  /*
2625  * Release retransmit list items if group aru indicates they are transmitted
2626  */
2627  for (i = 1; i <= range; i++) {
2628  void *ptr;
2629 
2630  res = sq_item_get (&instance->regular_sort_queue,
2631  instance->last_released + i, &ptr);
2632  if (res == 0) {
2633  regular_message = ptr;
2634  totemsrp_buffer_release (instance, regular_message->mcast);
2635  }
2636  sq_items_release (&instance->regular_sort_queue,
2637  instance->last_released + i);
2638 
2639  log_release = 1;
2640  }
2641  instance->last_released += range;
2642 
2643  if (log_release) {
2645  "releasing messages up to and including %x", release_to);
2646  }
2647 }
2648 
2649 static void update_aru (
2650  struct totemsrp_instance *instance)
2651 {
2652  unsigned int i;
2653  int res;
2654  struct sq *sort_queue;
2655  unsigned int range;
2656  unsigned int my_aru_saved = 0;
2657 
2658  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2659  sort_queue = &instance->recovery_sort_queue;
2660  } else {
2661  sort_queue = &instance->regular_sort_queue;
2662  }
2663 
2664  range = instance->my_high_seq_received - instance->my_aru;
2665 
2666  my_aru_saved = instance->my_aru;
2667  for (i = 1; i <= range; i++) {
2668 
2669  void *ptr;
2670 
2671  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2672  /*
2673  * If hole, stop updating aru
2674  */
2675  if (res != 0) {
2676  break;
2677  }
2678  }
2679  instance->my_aru += i - 1;
2680 }
2681 
2682 /*
2683  * Multicasts pending messages onto the ring (requires orf_token possession)
2684  */
2685 static int orf_token_mcast (
2686  struct totemsrp_instance *instance,
2687  struct orf_token *token,
2688  int fcc_mcasts_allowed)
2689 {
2690  struct message_item *message_item = 0;
2691  struct cs_queue *mcast_queue;
2692  struct sq *sort_queue;
2693  struct sort_queue_item sort_queue_item;
2694  struct mcast *mcast;
2695  unsigned int fcc_mcast_current;
2696 
2697  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2698  mcast_queue = &instance->retrans_message_queue;
2699  sort_queue = &instance->recovery_sort_queue;
2700  reset_token_retransmit_timeout (instance); // REVIEWED
2701  } else {
2702  if (instance->waiting_trans_ack) {
2703  mcast_queue = &instance->new_message_queue_trans;
2704  } else {
2705  mcast_queue = &instance->new_message_queue;
2706  }
2707 
2708  sort_queue = &instance->regular_sort_queue;
2709  }
2710 
2711  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2712  if (cs_queue_is_empty (mcast_queue)) {
2713  break;
2714  }
2715  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2716 
2717  message_item->mcast->seq = ++token->seq;
2718  message_item->mcast->this_seqno = instance->global_seqno++;
2719 
2720  /*
2721  * Build IO vector
2722  */
2723  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2724  sort_queue_item.mcast = message_item->mcast;
2725  sort_queue_item.msg_len = message_item->msg_len;
2726 
2727  mcast = sort_queue_item.mcast;
2728 
2729  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2730 
2731  /*
2732  * Add message to retransmit queue
2733  */
2734  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2735 
2737  instance->totemnet_context,
2738  message_item->mcast,
2739  message_item->msg_len);
2740 
2741  /*
2742  * Delete item from pending queue
2743  */
2744  cs_queue_item_remove (mcast_queue);
2745 
2746  /*
2747  * If messages mcasted, deliver any new messages to totempg
2748  */
2749  instance->my_high_seq_received = token->seq;
2750  }
2751 
2752  update_aru (instance);
2753 
2754  /*
2755  * Return 1 if more messages are available for single node clusters
2756  */
2757  return (fcc_mcast_current);
2758 }
2759 
2760 /*
2761  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2762  * Modify's orf_token's rtr to include retransmits required by this process
2763  */
2764 static int orf_token_rtr (
2765  struct totemsrp_instance *instance,
2766  struct orf_token *orf_token,
2767  unsigned int *fcc_allowed)
2768 {
2769  unsigned int res;
2770  unsigned int i, j;
2771  unsigned int found;
2772  struct sq *sort_queue;
2773  struct rtr_item *rtr_list;
2774  unsigned int range = 0;
2775  char retransmit_msg[1024];
2776  char value[64];
2777 
2778  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2779  sort_queue = &instance->recovery_sort_queue;
2780  } else {
2781  sort_queue = &instance->regular_sort_queue;
2782  }
2783 
2784  rtr_list = &orf_token->rtr_list[0];
2785 
2786  strcpy (retransmit_msg, "Retransmit List: ");
2787  if (orf_token->rtr_list_entries) {
2789  "Retransmit List %d", orf_token->rtr_list_entries);
2790  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2791  sprintf (value, "%x ", rtr_list[i].seq);
2792  strcat (retransmit_msg, value);
2793  }
2794  strcat (retransmit_msg, "");
2796  "%s", retransmit_msg);
2797  }
2798 
2799  /*
2800  * Retransmit messages on orf_token's RTR list from RTR queue
2801  */
2802  for (instance->fcc_remcast_current = 0, i = 0;
2803  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2804 
2805  /*
2806  * If this retransmit request isn't from this configuration,
2807  * try next rtr entry
2808  */
2809  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2810  sizeof (struct memb_ring_id)) != 0) {
2811 
2812  i += 1;
2813  continue;
2814  }
2815 
2816  res = orf_token_remcast (instance, rtr_list[i].seq);
2817  if (res == 0) {
2818  /*
2819  * Multicasted message, so no need to copy to new retransmit list
2820  */
2821  orf_token->rtr_list_entries -= 1;
2822  assert (orf_token->rtr_list_entries >= 0);
2823  memmove (&rtr_list[i], &rtr_list[i + 1],
2824  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2825 
2826  instance->stats.mcast_retx++;
2827  instance->fcc_remcast_current++;
2828  } else {
2829  i += 1;
2830  }
2831  }
2832  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2833 
2834  /*
2835  * Add messages to retransmit to RTR list
2836  * but only retry if there is room in the retransmit list
2837  */
2838 
2839  range = orf_token->seq - instance->my_aru;
2840  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2841 
2842  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2843  (i <= range); i++) {
2844 
2845  /*
2846  * Ensure message is within the sort queue range
2847  */
2848  res = sq_in_range (sort_queue, instance->my_aru + i);
2849  if (res == 0) {
2850  break;
2851  }
2852 
2853  /*
2854  * Find if a message is missing from this processor
2855  */
2856  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2857  if (res == 0) {
2858  /*
2859  * Determine how many times we have missed receiving
2860  * this sequence number. sq_item_miss_count increments
2861  * a counter for the sequence number. The miss count
2862  * will be returned and compared. This allows time for
2863  * delayed multicast messages to be received before
2864  * declaring the message is missing and requesting a
2865  * retransmit.
2866  */
2867  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2868  if (res < instance->totem_config->miss_count_const) {
2869  continue;
2870  }
2871 
2872  /*
2873  * Determine if missing message is already in retransmit list
2874  */
2875  found = 0;
2876  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2877  if (instance->my_aru + i == rtr_list[j].seq) {
2878  found = 1;
2879  }
2880  }
2881  if (found == 0) {
2882  /*
2883  * Missing message not found in current retransmit list so add it
2884  */
2885  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2886  &instance->my_ring_id, sizeof (struct memb_ring_id));
2887  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2888  orf_token->rtr_list_entries++;
2889  }
2890  }
2891  }
2892  return (instance->fcc_remcast_current);
2893 }
2894 
2895 static void token_retransmit (struct totemsrp_instance *instance)
2896 {
2898  instance->orf_token_retransmit,
2899  instance->orf_token_retransmit_size);
2900 }
2901 
2902 /*
2903  * Retransmit the regular token if no mcast or token has
2904  * been received in retransmit token period retransmit
2905  * the token to the next processor
2906  */
2907 static void timer_function_token_retransmit_timeout (void *data)
2908 {
2909  struct totemsrp_instance *instance = data;
2910 
2911  switch (instance->memb_state) {
2912  case MEMB_STATE_GATHER:
2913  break;
2914  case MEMB_STATE_COMMIT:
2916  case MEMB_STATE_RECOVERY:
2917  token_retransmit (instance);
2918  reset_token_retransmit_timeout (instance); // REVIEWED
2919  break;
2920  }
2921 }
2922 
2923 static void timer_function_token_hold_retransmit_timeout (void *data)
2924 {
2925  struct totemsrp_instance *instance = data;
2926 
2927  switch (instance->memb_state) {
2928  case MEMB_STATE_GATHER:
2929  break;
2930  case MEMB_STATE_COMMIT:
2931  break;
2933  case MEMB_STATE_RECOVERY:
2934  token_retransmit (instance);
2935  break;
2936  }
2937 }
2938 
2939 static void timer_function_merge_detect_timeout(void *data)
2940 {
2941  struct totemsrp_instance *instance = data;
2942 
2944 
2945  switch (instance->memb_state) {
2947  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2948  memb_merge_detect_transmit (instance);
2949  }
2950  break;
2951  case MEMB_STATE_GATHER:
2952  case MEMB_STATE_COMMIT:
2953  case MEMB_STATE_RECOVERY:
2954  break;
2955  }
2956 }
2957 
2958 /*
2959  * Send orf_token to next member (requires orf_token)
2960  */
2961 static int token_send (
2962  struct totemsrp_instance *instance,
2963  struct orf_token *orf_token,
2964  int forward_token)
2965 {
2966  int res = 0;
2967  unsigned int orf_token_size;
2968 
2969  orf_token_size = sizeof (struct orf_token) +
2970  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2971 
2972  orf_token->header.nodeid = instance->my_id.nodeid;
2973  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2974  instance->orf_token_retransmit_size = orf_token_size;
2975  assert (orf_token->header.nodeid);
2976 
2977  if (forward_token == 0) {
2978  return (0);
2979  }
2980 
2982  orf_token,
2983  orf_token_size);
2984 
2985  return (res);
2986 }
2987 
2988 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2989 {
2990  struct token_hold_cancel token_hold_cancel;
2991 
2992  /*
2993  * Only cancel if the token is currently held
2994  */
2995  if (instance->my_token_held == 0) {
2996  return (0);
2997  }
2998  instance->my_token_held = 0;
2999 
3000  /*
3001  * Build message
3002  */
3003  token_hold_cancel.header.magic = TOTEM_MH_MAGIC;
3004  token_hold_cancel.header.version = TOTEM_MH_VERSION;
3005  token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
3006  token_hold_cancel.header.encapsulated = 0;
3007  token_hold_cancel.header.nodeid = instance->my_id.nodeid;
3008  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3009  sizeof (struct memb_ring_id));
3010  assert (token_hold_cancel.header.nodeid);
3011 
3012  instance->stats.token_hold_cancel_tx++;
3013 
3014  totemnet_mcast_flush_send (instance->totemnet_context, &token_hold_cancel,
3015  sizeof (struct token_hold_cancel));
3016 
3017  return (0);
3018 }
3019 
3020 static int orf_token_send_initial (struct totemsrp_instance *instance)
3021 {
3022  struct orf_token orf_token;
3023  int res;
3024 
3025  orf_token.header.magic = TOTEM_MH_MAGIC;
3026  orf_token.header.version = TOTEM_MH_VERSION;
3027  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3028  orf_token.header.encapsulated = 0;
3029  orf_token.header.nodeid = instance->my_id.nodeid;
3030  assert (orf_token.header.nodeid);
3031  orf_token.seq = SEQNO_START_MSG;
3032  orf_token.token_seq = SEQNO_START_TOKEN;
3033  orf_token.retrans_flg = 1;
3034  instance->my_set_retrans_flg = 1;
3035  instance->stats.orf_token_tx++;
3036 
3037  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3038  orf_token.retrans_flg = 0;
3039  instance->my_set_retrans_flg = 0;
3040  } else {
3041  orf_token.retrans_flg = 1;
3042  instance->my_set_retrans_flg = 1;
3043  }
3044 
3045  orf_token.aru = 0;
3046  orf_token.aru = SEQNO_START_MSG - 1;
3047  orf_token.aru_addr = instance->my_id.nodeid;
3048 
3049  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3050  orf_token.fcc = 0;
3051  orf_token.backlog = 0;
3052 
3053  orf_token.rtr_list_entries = 0;
3054 
3055  res = token_send (instance, &orf_token, 1);
3056 
3057  return (res);
3058 }
3059 
3060 static void memb_state_commit_token_update (
3061  struct totemsrp_instance *instance)
3062 {
3063  struct srp_addr *addr;
3064  struct memb_commit_token_memb_entry *memb_list;
3065  unsigned int high_aru;
3066  unsigned int i;
3067 
3068  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3069  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3070 
3071  memcpy (instance->my_new_memb_list, addr,
3072  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3073 
3074  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3075 
3076  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3077  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3078 
3079  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3080  /*
3081  * TODO high delivered is really instance->my_aru, but with safe this
3082  * could change?
3083  */
3084  instance->my_received_flg =
3085  (instance->my_aru == instance->my_high_seq_received);
3086 
3087  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3088 
3089  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3090  /*
3091  * find high aru up to current memb_index for all matching ring ids
3092  * if any ring id matching memb_index has aru less then high aru set
3093  * received flag for that entry to false
3094  */
3095  high_aru = memb_list[instance->commit_token->memb_index].aru;
3096  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3097  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3098  &memb_list[i].ring_id,
3099  sizeof (struct memb_ring_id)) == 0) {
3100 
3101  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3102  high_aru = memb_list[i].aru;
3103  }
3104  }
3105  }
3106 
3107  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3108  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3109  &memb_list[i].ring_id,
3110  sizeof (struct memb_ring_id)) == 0) {
3111 
3112  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3113  memb_list[i].received_flg = 0;
3114  if (i == instance->commit_token->memb_index) {
3115  instance->my_received_flg = 0;
3116  }
3117  }
3118  }
3119  }
3120 
3121  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3122  instance->commit_token->memb_index += 1;
3123  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3124  assert (instance->commit_token->header.nodeid);
3125 }
3126 
3127 static void memb_state_commit_token_target_set (
3128  struct totemsrp_instance *instance)
3129 {
3130  struct srp_addr *addr;
3131 
3132  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3133 
3134  /* Totemnet just looks at the node id */
3136  instance->totemnet_context,
3137  addr[instance->commit_token->memb_index %
3138  instance->commit_token->addr_entries].nodeid);
3139 }
3140 
3141 static int memb_state_commit_token_send_recovery (
3142  struct totemsrp_instance *instance,
3143  struct memb_commit_token *commit_token)
3144 {
3145  unsigned int commit_token_size;
3146 
3147  commit_token->token_seq++;
3148  commit_token->header.nodeid = instance->my_id.nodeid;
3149  commit_token_size = sizeof (struct memb_commit_token) +
3150  ((sizeof (struct srp_addr) +
3151  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3152  /*
3153  * Make a copy for retransmission if necessary
3154  */
3155  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3156  instance->orf_token_retransmit_size = commit_token_size;
3157 
3158  instance->stats.memb_commit_token_tx++;
3159 
3161  commit_token,
3162  commit_token_size);
3163 
3164  /*
3165  * Request retransmission of the commit token in case it is lost
3166  */
3167  reset_token_retransmit_timeout (instance);
3168  return (0);
3169 }
3170 
3171 static int memb_state_commit_token_send (
3172  struct totemsrp_instance *instance)
3173 {
3174  unsigned int commit_token_size;
3175 
3176  instance->commit_token->token_seq++;
3177  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3178  commit_token_size = sizeof (struct memb_commit_token) +
3179  ((sizeof (struct srp_addr) +
3180  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3181  /*
3182  * Make a copy for retransmission if necessary
3183  */
3184  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3185  instance->orf_token_retransmit_size = commit_token_size;
3186 
3187  instance->stats.memb_commit_token_tx++;
3188 
3190  instance->commit_token,
3191  commit_token_size);
3192 
3193  /*
3194  * Request retransmission of the commit token in case it is lost
3195  */
3196  reset_token_retransmit_timeout (instance);
3197  return (0);
3198 }
3199 
3200 
3201 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3202 {
3203  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3204  int token_memb_entries = 0;
3205  int i;
3206  unsigned int lowest_nodeid;
3207 
3208  memb_set_subtract (token_memb, &token_memb_entries,
3209  instance->my_proc_list, instance->my_proc_list_entries,
3210  instance->my_failed_list, instance->my_failed_list_entries);
3211 
3212  /*
3213  * find representative by searching for smallest identifier
3214  */
3215  assert(token_memb_entries > 0);
3216 
3217  lowest_nodeid = token_memb[0].nodeid;
3218  for (i = 1; i < token_memb_entries; i++) {
3219  if (lowest_nodeid > token_memb[i].nodeid) {
3220  lowest_nodeid = token_memb[i].nodeid;
3221  }
3222  }
3223  return (lowest_nodeid == instance->my_id.nodeid);
3224 }
3225 
3226 static int srp_addr_compare (const void *a, const void *b)
3227 {
3228  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3229  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3230 
3231  if (srp_a->nodeid < srp_b->nodeid) {
3232  return -1;
3233  } else if (srp_a->nodeid > srp_b->nodeid) {
3234  return 1;
3235  } else {
3236  return 0;
3237  }
3238 }
3239 
3240 static void memb_state_commit_token_create (
3241  struct totemsrp_instance *instance)
3242 {
3243  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3244  struct srp_addr *addr;
3245  struct memb_commit_token_memb_entry *memb_list;
3246  int token_memb_entries = 0;
3247 
3249  "Creating commit token because I am the rep.");
3250 
3251  memb_set_subtract (token_memb, &token_memb_entries,
3252  instance->my_proc_list, instance->my_proc_list_entries,
3253  instance->my_failed_list, instance->my_failed_list_entries);
3254 
3255  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3256  instance->commit_token->header.magic = TOTEM_MH_MAGIC;
3259  instance->commit_token->header.encapsulated = 0;
3260  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3261  assert (instance->commit_token->header.nodeid);
3262 
3263  instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3264  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3265 
3266  /*
3267  * This qsort is necessary to ensure the commit token traverses
3268  * the ring in the proper order
3269  */
3270  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3271  srp_addr_compare);
3272 
3273  instance->commit_token->memb_index = 0;
3274  instance->commit_token->addr_entries = token_memb_entries;
3275 
3276  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3277  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3278 
3279  memcpy (addr, token_memb,
3280  token_memb_entries * sizeof (struct srp_addr));
3281  memset (memb_list, 0,
3282  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3283 }
3284 
3285 static void memb_join_message_send (struct totemsrp_instance *instance)
3286 {
3287  char memb_join_data[40000];
3288  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3289  char *addr;
3290  unsigned int addr_idx;
3291  size_t msg_len;
3292 
3293  memb_join->header.magic = TOTEM_MH_MAGIC;
3294  memb_join->header.version = TOTEM_MH_VERSION;
3295  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3296  memb_join->header.encapsulated = 0;
3297  memb_join->header.nodeid = instance->my_id.nodeid;
3298  assert (memb_join->header.nodeid);
3299 
3300  msg_len = sizeof(struct memb_join) +
3301  ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3302 
3303  if (msg_len > sizeof(memb_join_data)) {
3304  log_printf (instance->totemsrp_log_level_error,
3305  "memb_join_message too long. Ignoring message.");
3306 
3307  return ;
3308  }
3309 
3310  memb_join->ring_seq = instance->my_ring_id.seq;
3311  memb_join->proc_list_entries = instance->my_proc_list_entries;
3312  memb_join->failed_list_entries = instance->my_failed_list_entries;
3313  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3314 
3315  /*
3316  * This mess adds the joined and failed processor lists into the join
3317  * message
3318  */
3319  addr = (char *)memb_join;
3320  addr_idx = sizeof (struct memb_join);
3321  memcpy (&addr[addr_idx],
3322  instance->my_proc_list,
3323  instance->my_proc_list_entries *
3324  sizeof (struct srp_addr));
3325  addr_idx +=
3326  instance->my_proc_list_entries *
3327  sizeof (struct srp_addr);
3328  memcpy (&addr[addr_idx],
3329  instance->my_failed_list,
3330  instance->my_failed_list_entries *
3331  sizeof (struct srp_addr));
3332  addr_idx +=
3333  instance->my_failed_list_entries *
3334  sizeof (struct srp_addr);
3335 
3336  if (instance->totem_config->send_join_timeout) {
3337  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3338  }
3339 
3340  instance->stats.memb_join_tx++;
3341 
3343  instance->totemnet_context,
3344  memb_join,
3345  addr_idx);
3346 }
3347 
3348 static void memb_leave_message_send (struct totemsrp_instance *instance)
3349 {
3350  char memb_join_data[40000];
3351  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3352  char *addr;
3353  unsigned int addr_idx;
3354  int active_memb_entries;
3355  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3356  size_t msg_len;
3357 
3359  "sending join/leave message");
3360 
3361  /*
3362  * add us to the failed list, and remove us from
3363  * the members list
3364  */
3365  memb_set_merge(
3366  &instance->my_id, 1,
3367  instance->my_failed_list, &instance->my_failed_list_entries);
3368 
3369  memb_set_subtract (active_memb, &active_memb_entries,
3370  instance->my_proc_list, instance->my_proc_list_entries,
3371  &instance->my_id, 1);
3372 
3373  msg_len = sizeof(struct memb_join) +
3374  ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3375 
3376  if (msg_len > sizeof(memb_join_data)) {
3377  log_printf (instance->totemsrp_log_level_error,
3378  "memb_leave message too long. Ignoring message.");
3379 
3380  return ;
3381  }
3382 
3383  memb_join->header.magic = TOTEM_MH_MAGIC;
3384  memb_join->header.version = TOTEM_MH_VERSION;
3385  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3386  memb_join->header.encapsulated = 0;
3387  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3388 
3389  memb_join->ring_seq = instance->my_ring_id.seq;
3390  memb_join->proc_list_entries = active_memb_entries;
3391  memb_join->failed_list_entries = instance->my_failed_list_entries;
3392  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3393 
3394  // TODO: CC Maybe use the actual join send routine.
3395  /*
3396  * This mess adds the joined and failed processor lists into the join
3397  * message
3398  */
3399  addr = (char *)memb_join;
3400  addr_idx = sizeof (struct memb_join);
3401  memcpy (&addr[addr_idx],
3402  active_memb,
3403  active_memb_entries *
3404  sizeof (struct srp_addr));
3405  addr_idx +=
3406  active_memb_entries *
3407  sizeof (struct srp_addr);
3408  memcpy (&addr[addr_idx],
3409  instance->my_failed_list,
3410  instance->my_failed_list_entries *
3411  sizeof (struct srp_addr));
3412  addr_idx +=
3413  instance->my_failed_list_entries *
3414  sizeof (struct srp_addr);
3415 
3416 
3417  if (instance->totem_config->send_join_timeout) {
3418  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3419  }
3420  instance->stats.memb_join_tx++;
3421 
3423  instance->totemnet_context,
3424  memb_join,
3425  addr_idx);
3426 }
3427 
3428 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3429 {
3430  struct memb_merge_detect memb_merge_detect;
3431 
3432  memb_merge_detect.header.magic = TOTEM_MH_MAGIC;
3433  memb_merge_detect.header.version = TOTEM_MH_VERSION;
3434  memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
3435  memb_merge_detect.header.encapsulated = 0;
3436  memb_merge_detect.header.nodeid = instance->my_id.nodeid;
3437  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3438  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3439  sizeof (struct memb_ring_id));
3440  assert (memb_merge_detect.header.nodeid);
3441 
3442  instance->stats.memb_merge_detect_tx++;
3444  &memb_merge_detect,
3445  sizeof (struct memb_merge_detect));
3446 }
3447 
3448 static void memb_ring_id_set (
3449  struct totemsrp_instance *instance,
3450  const struct memb_ring_id *ring_id)
3451 {
3452 
3453  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3454 }
3455 
3457  void *srp_context,
3458  void **handle_out,
3459  enum totem_callback_token_type type,
3460  int delete,
3461  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3462  const void *data)
3463 {
3464  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3465  struct token_callback_instance *callback_handle;
3466 
3467  token_hold_cancel_send (instance);
3468 
3469  callback_handle = malloc (sizeof (struct token_callback_instance));
3470  if (callback_handle == 0) {
3471  return (-1);
3472  }
3473  *handle_out = (void *)callback_handle;
3474  qb_list_init (&callback_handle->list);
3475  callback_handle->callback_fn = callback_fn;
3476  callback_handle->data = (void *) data;
3477  callback_handle->callback_type = type;
3478  callback_handle->delete = delete;
3479  switch (type) {
3481  qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3482  break;
3484  qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3485  break;
3486  }
3487 
3488  return (0);
3489 }
3490 
3491 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3492 {
3493  struct token_callback_instance *h;
3494 
3495  if (*handle_out) {
3496  h = (struct token_callback_instance *)*handle_out;
3497  qb_list_del (&h->list);
3498  free (h);
3499  h = NULL;
3500  *handle_out = 0;
3501  }
3502 }
3503 
3504 static void token_callbacks_execute (
3505  struct totemsrp_instance *instance,
3506  enum totem_callback_token_type type)
3507 {
3508  struct qb_list_head *list, *tmp_iter;
3509  struct qb_list_head *callback_listhead = 0;
3511  int res;
3512  int del;
3513 
3514  switch (type) {
3516  callback_listhead = &instance->token_callback_received_listhead;
3517  break;
3519  callback_listhead = &instance->token_callback_sent_listhead;
3520  break;
3521  default:
3522  assert (0);
3523  }
3524 
3525  qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3526  token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3527  del = token_callback_instance->delete;
3528  if (del == 1) {
3529  qb_list_del (list);
3530  }
3531 
3532  res = token_callback_instance->callback_fn (
3533  token_callback_instance->callback_type,
3534  token_callback_instance->data);
3535  /*
3536  * This callback failed to execute, try it again on the next token
3537  */
3538  if (res == -1 && del == 1) {
3539  qb_list_add (list, callback_listhead);
3540  } else if (del) {
3541  free (token_callback_instance);
3542  }
3543  }
3544 }
3545 
3546 /*
3547  * Flow control functions
3548  */
3549 static unsigned int backlog_get (struct totemsrp_instance *instance)
3550 {
3551  unsigned int backlog = 0;
3552  struct cs_queue *queue_use = NULL;
3553 
3554  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3555  if (instance->waiting_trans_ack) {
3556  queue_use = &instance->new_message_queue_trans;
3557  } else {
3558  queue_use = &instance->new_message_queue;
3559  }
3560  } else
3561  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3562  queue_use = &instance->retrans_message_queue;
3563  }
3564 
3565  if (queue_use != NULL) {
3566  backlog = cs_queue_used (queue_use);
3567  }
3568 
3569  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3570  return (backlog);
3571 }
3572 
3573 static int fcc_calculate (
3574  struct totemsrp_instance *instance,
3575  struct orf_token *token)
3576 {
3577  unsigned int transmits_allowed;
3578  unsigned int backlog_calc;
3579 
3580  transmits_allowed = instance->totem_config->max_messages;
3581 
3582  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3583  transmits_allowed = instance->totem_config->window_size - token->fcc;
3584  }
3585 
3586  instance->my_cbl = backlog_get (instance);
3587 
3588  /*
3589  * Only do backlog calculation if there is a backlog otherwise
3590  * we would result in div by zero
3591  */
3592  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3593  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3594  (token->backlog + instance->my_cbl - instance->my_pbl);
3595  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3596  transmits_allowed = backlog_calc;
3597  }
3598  }
3599 
3600  return (transmits_allowed);
3601 }
3602 
3603 /*
3604  * don't overflow the RTR sort queue
3605  */
3606 static void fcc_rtr_limit (
3607  struct totemsrp_instance *instance,
3608  struct orf_token *token,
3609  unsigned int *transmits_allowed)
3610 {
3611  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3612  check -= (*transmits_allowed + instance->totem_config->window_size);
3613  assert (check >= 0);
3614  if (sq_lt_compare (instance->last_released +
3615  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3616  instance->totem_config->window_size,
3617 
3618  token->seq)) {
3619 
3620  *transmits_allowed = 0;
3621  }
3622 }
3623 
3624 static void fcc_token_update (
3625  struct totemsrp_instance *instance,
3626  struct orf_token *token,
3627  unsigned int msgs_transmitted)
3628 {
3629  token->fcc += msgs_transmitted - instance->my_trc;
3630  token->backlog += instance->my_cbl - instance->my_pbl;
3631  instance->my_trc = msgs_transmitted;
3632  instance->my_pbl = instance->my_cbl;
3633 }
3634 
3635 /*
3636  * Sanity checkers
3637  */
3638 static int check_orf_token_sanity(
3639  const struct totemsrp_instance *instance,
3640  const void *msg,
3641  size_t msg_len,
3642  int endian_conversion_needed)
3643 {
3644  int rtr_entries;
3645  const struct orf_token *token = (const struct orf_token *)msg;
3646  size_t required_len;
3647 
3648  if (msg_len < sizeof(struct orf_token)) {
3650  "Received orf_token message is too short... ignoring.");
3651 
3652  return (-1);
3653  }
3654 
3655  if (endian_conversion_needed) {
3656  rtr_entries = swab32(token->rtr_list_entries);
3657  } else {
3658  rtr_entries = token->rtr_list_entries;
3659  }
3660 
3661  required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3662  if (msg_len < required_len) {
3664  "Received orf_token message is too short... ignoring.");
3665 
3666  return (-1);
3667  }
3668 
3669  return (0);
3670 }
3671 
3672 static int check_mcast_sanity(
3673  struct totemsrp_instance *instance,
3674  const void *msg,
3675  size_t msg_len,
3676  int endian_conversion_needed)
3677 {
3678 
3679  if (msg_len < sizeof(struct mcast)) {
3681  "Received mcast message is too short... ignoring.");
3682 
3683  return (-1);
3684  }
3685 
3686  return (0);
3687 }
3688 
3689 static int check_memb_merge_detect_sanity(
3690  struct totemsrp_instance *instance,
3691  const void *msg,
3692  size_t msg_len,
3693  int endian_conversion_needed)
3694 {
3695 
3696  if (msg_len < sizeof(struct memb_merge_detect)) {
3698  "Received memb_merge_detect message is too short... ignoring.");
3699 
3700  return (-1);
3701  }
3702 
3703  return (0);
3704 }
3705 
3706 static int check_memb_join_sanity(
3707  struct totemsrp_instance *instance,
3708  const void *msg,
3709  size_t msg_len,
3710  int endian_conversion_needed)
3711 {
3712  const struct memb_join *mj_msg = (const struct memb_join *)msg;
3713  unsigned int proc_list_entries;
3714  unsigned int failed_list_entries;
3715  size_t required_len;
3716 
3717  if (msg_len < sizeof(struct memb_join)) {
3719  "Received memb_join message is too short... ignoring.");
3720 
3721  return (-1);
3722  }
3723 
3724  proc_list_entries = mj_msg->proc_list_entries;
3725  failed_list_entries = mj_msg->failed_list_entries;
3726 
3727  if (endian_conversion_needed) {
3728  proc_list_entries = swab32(proc_list_entries);
3729  failed_list_entries = swab32(failed_list_entries);
3730  }
3731 
3732  required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3733  if (msg_len < required_len) {
3735  "Received memb_join message is too short... ignoring.");
3736 
3737  return (-1);
3738  }
3739 
3740  return (0);
3741 }
3742 
3743 static int check_memb_commit_token_sanity(
3744  struct totemsrp_instance *instance,
3745  const void *msg,
3746  size_t msg_len,
3747  int endian_conversion_needed)
3748 {
3749  const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3750  unsigned int addr_entries;
3751  size_t required_len;
3752 
3753  if (msg_len < sizeof(struct memb_commit_token)) {
3755  "Received memb_commit_token message is too short... ignoring.");
3756 
3757  return (0);
3758  }
3759 
3760  addr_entries= mct_msg->addr_entries;
3761  if (endian_conversion_needed) {
3762  addr_entries = swab32(addr_entries);
3763  }
3764 
3765  required_len = sizeof(struct memb_commit_token) +
3766  (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3767  if (msg_len < required_len) {
3769  "Received memb_commit_token message is too short... ignoring.");
3770 
3771  return (-1);
3772  }
3773 
3774  return (0);
3775 }
3776 
3777 static int check_token_hold_cancel_sanity(
3778  struct totemsrp_instance *instance,
3779  const void *msg,
3780  size_t msg_len,
3781  int endian_conversion_needed)
3782 {
3783 
3784  if (msg_len < sizeof(struct token_hold_cancel)) {
3786  "Received token_hold_cancel message is too short... ignoring.");
3787 
3788  return (-1);
3789  }
3790 
3791  return (0);
3792 }
3793 
3794 /*
3795  * Message Handlers
3796  */
3797 
3798 unsigned long long int tv_old;
3799 /*
3800  * message handler called when TOKEN message type received
3801  */
3802 static int message_handler_orf_token (
3803  struct totemsrp_instance *instance,
3804  const void *msg,
3805  size_t msg_len,
3806  int endian_conversion_needed)
3807 {
3808  char token_storage[1500];
3809  char token_convert[1500];
3810  struct orf_token *token = NULL;
3811  int forward_token;
3812  unsigned int transmits_allowed;
3813  unsigned int mcasted_retransmit;
3814  unsigned int mcasted_regular;
3815  unsigned int last_aru;
3816 
3817 #ifdef GIVEINFO
3818  unsigned long long tv_current;
3819  unsigned long long tv_diff;
3820 
3821  tv_current = qb_util_nano_current_get ();
3822  tv_diff = tv_current - tv_old;
3823  tv_old = tv_current;
3824 
3826  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3827 #endif
3828 
3829  if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3830  return (0);
3831  }
3832 
3833  if (instance->orf_token_discard) {
3834  return (0);
3835  }
3836 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3837  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3838  return (0);
3839  }
3840 #endif
3841 
3842  if (endian_conversion_needed) {
3843  orf_token_endian_convert ((struct orf_token *)msg,
3844  (struct orf_token *)token_convert);
3845  msg = (struct orf_token *)token_convert;
3846  }
3847 
3848  /*
3849  * Make copy of token and retransmit list in case we have
3850  * to flush incoming messages from the kernel queue
3851  */
3852  token = (struct orf_token *)token_storage;
3853  memcpy (token, msg, sizeof (struct orf_token));
3854  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3855  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3856 
3857 
3858  /*
3859  * Handle merge detection timeout
3860  */
3861  if (token->seq == instance->my_last_seq) {
3862  start_merge_detect_timeout (instance);
3863  instance->my_seq_unchanged += 1;
3864  } else {
3865  cancel_merge_detect_timeout (instance);
3866  cancel_token_hold_retransmit_timeout (instance);
3867  instance->my_seq_unchanged = 0;
3868  }
3869 
3870  instance->my_last_seq = token->seq;
3871 
3872 #ifdef TEST_RECOVERY_MSG_COUNT
3873  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3874  return (0);
3875  }
3876 #endif
3877  instance->flushing = 1;
3879  instance->flushing = 0;
3880 
3881  /*
3882  * Determine if we should hold (in reality drop) the token
3883  */
3884  instance->my_token_held = 0;
3885  if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3886  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3887  instance->my_token_held = 1;
3888  } else {
3889  if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3890  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3891  instance->my_token_held = 1;
3892  }
3893  }
3894 
3895  /*
3896  * Hold onto token when there is no activity on ring and
3897  * this processor is the ring rep
3898  */
3899  forward_token = 1;
3900  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3901  if (instance->my_token_held) {
3902  forward_token = 0;
3903  }
3904  }
3905 
3906  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3907 
3908  switch (instance->memb_state) {
3909  case MEMB_STATE_COMMIT:
3910  /* Discard token */
3911  break;
3912 
3914  messages_free (instance, token->aru);
3915  /*
3916  * Do NOT add break, this case should also execute code in gather case.
3917  */
3918 
3919  case MEMB_STATE_GATHER:
3920  /*
3921  * DO NOT add break, we use different free mechanism in recovery state
3922  */
3923 
3924  case MEMB_STATE_RECOVERY:
3925  /*
3926  * Discard tokens from another configuration
3927  */
3928  if (memcmp (&token->ring_id, &instance->my_ring_id,
3929  sizeof (struct memb_ring_id)) != 0) {
3930 
3931  if ((forward_token)
3932  && instance->use_heartbeat) {
3933  reset_heartbeat_timeout(instance);
3934  }
3935  else {
3936  cancel_heartbeat_timeout(instance);
3937  }
3938 
3939  return (0); /* discard token */
3940  }
3941 
3942  /*
3943  * Discard retransmitted tokens
3944  */
3945  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3946  return (0); /* discard token */
3947  }
3948  last_aru = instance->my_last_aru;
3949  instance->my_last_aru = token->aru;
3950 
3951  transmits_allowed = fcc_calculate (instance, token);
3952  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3953 
3954  if (instance->my_token_held == 1 &&
3955  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3956  instance->my_token_held = 0;
3957  forward_token = 1;
3958  }
3959 
3960  fcc_rtr_limit (instance, token, &transmits_allowed);
3961  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3962 /*
3963 if (mcasted_regular) {
3964 printf ("mcasted regular %d\n", mcasted_regular);
3965 printf ("token seq %d\n", token->seq);
3966 }
3967 */
3968  fcc_token_update (instance, token, mcasted_retransmit +
3969  mcasted_regular);
3970 
3971  if (sq_lt_compare (instance->my_aru, token->aru) ||
3972  instance->my_id.nodeid == token->aru_addr ||
3973  token->aru_addr == 0) {
3974 
3975  token->aru = instance->my_aru;
3976  if (token->aru == token->seq) {
3977  token->aru_addr = 0;
3978  } else {
3979  token->aru_addr = instance->my_id.nodeid;
3980  }
3981  }
3982  if (token->aru == last_aru && token->aru_addr != 0) {
3983  instance->my_aru_count += 1;
3984  } else {
3985  instance->my_aru_count = 0;
3986  }
3987 
3988  /*
3989  * We really don't follow specification there. In specification, OTHER nodes
3990  * detect failure of one node (based on aru_count) and my_id IS NEVER added
3991  * to failed list (so node never mark itself as failed)
3992  */
3993  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3994  token->aru_addr == instance->my_id.nodeid) {
3995 
3997  "FAILED TO RECEIVE");
3998 
3999  instance->failed_to_recv = 1;
4000 
4001  memb_set_merge (&instance->my_id, 1,
4002  instance->my_failed_list,
4003  &instance->my_failed_list_entries);
4004 
4005  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4006  } else {
4007  instance->my_token_seq = token->token_seq;
4008  token->token_seq += 1;
4009 
4010  if (instance->memb_state == MEMB_STATE_RECOVERY) {
4011  /*
4012  * instance->my_aru == instance->my_high_seq_received means this processor
4013  * has recovered all messages it can recover
4014  * (ie: its retrans queue is empty)
4015  */
4016  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4017 
4018  if (token->retrans_flg == 0) {
4019  token->retrans_flg = 1;
4020  instance->my_set_retrans_flg = 1;
4021  }
4022  } else
4023  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4024  token->retrans_flg = 0;
4025  instance->my_set_retrans_flg = 0;
4026  }
4028  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4029  token->retrans_flg, instance->my_set_retrans_flg,
4030  cs_queue_is_empty (&instance->retrans_message_queue),
4031  instance->my_retrans_flg_count, token->aru);
4032  if (token->retrans_flg == 0) {
4033  instance->my_retrans_flg_count += 1;
4034  } else {
4035  instance->my_retrans_flg_count = 0;
4036  }
4037  if (instance->my_retrans_flg_count == 2) {
4038  instance->my_install_seq = token->seq;
4039  }
4041  "install seq %x aru %x high seq received %x",
4042  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4043  if (instance->my_retrans_flg_count >= 2 &&
4044  instance->my_received_flg == 0 &&
4045  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4046  instance->my_received_flg = 1;
4047  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4048  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4049  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4050  }
4051  if (instance->my_retrans_flg_count >= 3 &&
4052  sq_lte_compare (instance->my_install_seq, token->aru)) {
4053  instance->my_rotation_counter += 1;
4054  } else {
4055  instance->my_rotation_counter = 0;
4056  }
4057  if (instance->my_rotation_counter == 2) {
4059  "retrans flag count %x token aru %x install seq %x aru %x %x",
4060  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4061  instance->my_aru, token->seq);
4062 
4063  memb_state_operational_enter (instance);
4064  instance->my_rotation_counter = 0;
4065  instance->my_retrans_flg_count = 0;
4066  }
4067  }
4068 
4070  token_send (instance, token, forward_token);
4071 
4072 #ifdef GIVEINFO
4073  tv_current = qb_util_nano_current_get ();
4074  tv_diff = tv_current - tv_old;
4075  tv_old = tv_current;
4077  "I held %0.4f ms",
4078  ((float)tv_diff) / 1000000.0);
4079 #endif
4080  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4081  messages_deliver_to_app (instance, 0,
4082  instance->my_high_seq_received);
4083  }
4084 
4085  /*
4086  * Deliver messages after token has been transmitted
4087  * to improve performance
4088  */
4089  reset_token_timeout (instance); // REVIEWED
4090  reset_token_retransmit_timeout (instance); // REVIEWED
4091  if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4092  instance->my_token_held == 1) {
4093 
4094  start_token_hold_retransmit_timeout (instance);
4095  }
4096 
4097  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4098  }
4099  break;
4100  }
4101 
4102  if ((forward_token)
4103  && instance->use_heartbeat) {
4104  reset_heartbeat_timeout(instance);
4105  }
4106  else {
4107  cancel_heartbeat_timeout(instance);
4108  }
4109 
4110  return (0);
4111 }
4112 
4113 static void messages_deliver_to_app (
4114  struct totemsrp_instance *instance,
4115  int skip,
4116  unsigned int end_point)
4117 {
4118  struct sort_queue_item *sort_queue_item_p;
4119  unsigned int i;
4120  int res;
4121  struct mcast *mcast_in;
4122  struct mcast mcast_header;
4123  unsigned int range = 0;
4124  int endian_conversion_required;
4125  unsigned int my_high_delivered_stored = 0;
4126 
4127 
4128  range = end_point - instance->my_high_delivered;
4129 
4130  if (range) {
4132  "Delivering %x to %x", instance->my_high_delivered,
4133  end_point);
4134  }
4135  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4136  my_high_delivered_stored = instance->my_high_delivered;
4137 
4138  /*
4139  * Deliver messages in order from rtr queue to pending delivery queue
4140  */
4141  for (i = 1; i <= range; i++) {
4142 
4143  void *ptr = 0;
4144 
4145  /*
4146  * If out of range of sort queue, stop assembly
4147  */
4148  res = sq_in_range (&instance->regular_sort_queue,
4149  my_high_delivered_stored + i);
4150  if (res == 0) {
4151  break;
4152  }
4153 
4154  res = sq_item_get (&instance->regular_sort_queue,
4155  my_high_delivered_stored + i, &ptr);
4156  /*
4157  * If hole, stop assembly
4158  */
4159  if (res != 0 && skip == 0) {
4160  break;
4161  }
4162 
4163  instance->my_high_delivered = my_high_delivered_stored + i;
4164 
4165  if (res != 0) {
4166  continue;
4167 
4168  }
4169 
4170  sort_queue_item_p = ptr;
4171 
4172  mcast_in = sort_queue_item_p->mcast;
4173  assert (mcast_in != (struct mcast *)0xdeadbeef);
4174 
4175  endian_conversion_required = 0;
4176  if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4177  endian_conversion_required = 1;
4178  mcast_endian_convert (mcast_in, &mcast_header);
4179  } else {
4180  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4181  }
4182 
4183  /*
4184  * Skip messages not originated in instance->my_deliver_memb
4185  */
4186  if (skip &&
4187  memb_set_subset (&mcast_header.system_from,
4188  1,
4189  instance->my_deliver_memb_list,
4190  instance->my_deliver_memb_entries) == 0) {
4191 
4192  instance->my_high_delivered = my_high_delivered_stored + i;
4193 
4194  continue;
4195  }
4196 
4197  /*
4198  * Message found
4199  */
4201  "Delivering MCAST message with seq %x to pending delivery queue",
4202  mcast_header.seq);
4203 
4204  /*
4205  * Message is locally originated multicast
4206  */
4207  instance->totemsrp_deliver_fn (
4208  mcast_header.header.nodeid,
4209  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4210  sort_queue_item_p->msg_len - sizeof (struct mcast),
4211  endian_conversion_required);
4212  }
4213 }
4214 
4215 /*
4216  * recv message handler called when MCAST message type received
4217  */
4218 static int message_handler_mcast (
4219  struct totemsrp_instance *instance,
4220  const void *msg,
4221  size_t msg_len,
4222  int endian_conversion_needed)
4223 {
4224  struct sort_queue_item sort_queue_item;
4225  struct sq *sort_queue;
4226  struct mcast mcast_header;
4227 
4228  if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4229  return (0);
4230  }
4231 
4232  if (endian_conversion_needed) {
4233  mcast_endian_convert (msg, &mcast_header);
4234  } else {
4235  memcpy (&mcast_header, msg, sizeof (struct mcast));
4236  }
4237 
4238  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4239  sort_queue = &instance->recovery_sort_queue;
4240  } else {
4241  sort_queue = &instance->regular_sort_queue;
4242  }
4243 
4244  assert (msg_len <= FRAME_SIZE_MAX);
4245 
4246 #ifdef TEST_DROP_MCAST_PERCENTAGE
4247  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4248  return (0);
4249  }
4250 #endif
4251 
4252  /*
4253  * If the message is foreign execute the switch below
4254  */
4255  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4256  sizeof (struct memb_ring_id)) != 0) {
4257 
4258  switch (instance->memb_state) {
4260  memb_set_merge (
4261  &mcast_header.system_from, 1,
4262  instance->my_proc_list, &instance->my_proc_list_entries);
4263  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4264  break;
4265 
4266  case MEMB_STATE_GATHER:
4267  if (!memb_set_subset (
4268  &mcast_header.system_from,
4269  1,
4270  instance->my_proc_list,
4271  instance->my_proc_list_entries)) {
4272 
4273  memb_set_merge (&mcast_header.system_from, 1,
4274  instance->my_proc_list, &instance->my_proc_list_entries);
4275  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4276  return (0);
4277  }
4278  break;
4279 
4280  case MEMB_STATE_COMMIT:
4281  /* discard message */
4282  instance->stats.rx_msg_dropped++;
4283  break;
4284 
4285  case MEMB_STATE_RECOVERY:
4286  /* discard message */
4287  instance->stats.rx_msg_dropped++;
4288  break;
4289  }
4290  return (0);
4291  }
4292 
4294  "Received ringid (" CS_PRI_RING_ID ") seq %x",
4295  mcast_header.ring_id.rep,
4296  (uint64_t)mcast_header.ring_id.seq,
4297  mcast_header.seq);
4298 
4299  /*
4300  * Add mcast message to rtr queue if not already in rtr queue
4301  * otherwise free io vectors
4302  */
4303  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4304  sq_in_range (sort_queue, mcast_header.seq) &&
4305  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4306 
4307  /*
4308  * Allocate new multicast memory block
4309  */
4310 // TODO LEAK
4311  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4312  if (sort_queue_item.mcast == NULL) {
4313  return (-1); /* error here is corrected by the algorithm */
4314  }
4315  memcpy (sort_queue_item.mcast, msg, msg_len);
4316  sort_queue_item.msg_len = msg_len;
4317 
4318  if (sq_lt_compare (instance->my_high_seq_received,
4319  mcast_header.seq)) {
4320  instance->my_high_seq_received = mcast_header.seq;
4321  }
4322 
4323  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4324  }
4325 
4326  update_aru (instance);
4327  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4328  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4329  }
4330 
4331 /* TODO remove from retrans message queue for old ring in recovery state */
4332  return (0);
4333 }
4334 
4335 static int message_handler_memb_merge_detect (
4336  struct totemsrp_instance *instance,
4337  const void *msg,
4338  size_t msg_len,
4339  int endian_conversion_needed)
4340 {
4341  struct memb_merge_detect memb_merge_detect;
4342 
4343  if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4344  return (0);
4345  }
4346 
4347  if (endian_conversion_needed) {
4348  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4349  } else {
4350  memcpy (&memb_merge_detect, msg,
4351  sizeof (struct memb_merge_detect));
4352  }
4353 
4354  /*
4355  * do nothing if this is a merge detect from this configuration
4356  */
4357  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4358  sizeof (struct memb_ring_id)) == 0) {
4359 
4360  return (0);
4361  }
4362 
4363  /*
4364  * Execute merge operation
4365  */
4366  switch (instance->memb_state) {
4368  memb_set_merge (&memb_merge_detect.system_from, 1,
4369  instance->my_proc_list, &instance->my_proc_list_entries);
4370  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4371  break;
4372 
4373  case MEMB_STATE_GATHER:
4374  if (!memb_set_subset (
4375  &memb_merge_detect.system_from,
4376  1,
4377  instance->my_proc_list,
4378  instance->my_proc_list_entries)) {
4379 
4380  memb_set_merge (&memb_merge_detect.system_from, 1,
4381  instance->my_proc_list, &instance->my_proc_list_entries);
4382  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4383  return (0);
4384  }
4385  break;
4386 
4387  case MEMB_STATE_COMMIT:
4388  /* do nothing in commit */
4389  break;
4390 
4391  case MEMB_STATE_RECOVERY:
4392  /* do nothing in recovery */
4393  break;
4394  }
4395  return (0);
4396 }
4397 
4398 static void memb_join_process (
4399  struct totemsrp_instance *instance,
4400  const struct memb_join *memb_join)
4401 {
4402  struct srp_addr *proc_list;
4403  struct srp_addr *failed_list;
4404  int gather_entered = 0;
4405  int fail_minus_memb_entries = 0;
4406  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4407 
4408  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4409  failed_list = proc_list + memb_join->proc_list_entries;
4410 
4411  log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4412  memb_set_log(instance, instance->totemsrp_log_level_trace,
4413  "proclist", proc_list, memb_join->proc_list_entries);
4414  memb_set_log(instance, instance->totemsrp_log_level_trace,
4415  "faillist", failed_list, memb_join->failed_list_entries);
4416  memb_set_log(instance, instance->totemsrp_log_level_trace,
4417  "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4418  memb_set_log(instance, instance->totemsrp_log_level_trace,
4419  "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4420 
4421  if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4422  if (instance->flushing) {
4423  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4425  "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4426  memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4427  if (memb_join->failed_list_entries > 0) {
4428  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4429  }
4430  } else {
4432  "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4433  }
4434  return;
4435  } else {
4436  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4438  "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4439  if (memb_join->failed_list_entries > 0) {
4440  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4441  }
4442  }
4443  }
4444 
4445  }
4446 
4447  if (memb_set_equal (proc_list,
4448  memb_join->proc_list_entries,
4449  instance->my_proc_list,
4450  instance->my_proc_list_entries) &&
4451 
4452  memb_set_equal (failed_list,
4453  memb_join->failed_list_entries,
4454  instance->my_failed_list,
4455  instance->my_failed_list_entries)) {
4456 
4457  if (memb_join->header.nodeid != LEAVE_DUMMY_NODEID) {
4458  memb_consensus_set (instance, &memb_join->system_from);
4459  }
4460 
4461  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4462  instance->failed_to_recv = 0;
4463  srp_addr_copy (&instance->my_proc_list[0],
4464  &instance->my_id);
4465  instance->my_proc_list_entries = 1;
4466  instance->my_failed_list_entries = 0;
4467 
4468  memb_state_commit_token_create (instance);
4469 
4470  memb_state_commit_enter (instance);
4471  return;
4472  }
4473  if (memb_consensus_agreed (instance) &&
4474  memb_lowest_in_config (instance)) {
4475 
4476  memb_state_commit_token_create (instance);
4477 
4478  memb_state_commit_enter (instance);
4479  } else {
4480  goto out;
4481  }
4482  } else
4483  if (memb_set_subset (proc_list,
4484  memb_join->proc_list_entries,
4485  instance->my_proc_list,
4486  instance->my_proc_list_entries) &&
4487 
4488  memb_set_subset (failed_list,
4489  memb_join->failed_list_entries,
4490  instance->my_failed_list,
4491  instance->my_failed_list_entries)) {
4492 
4493  goto out;
4494  } else
4495  if (memb_set_subset (&memb_join->system_from, 1,
4496  instance->my_failed_list, instance->my_failed_list_entries)) {
4497 
4498  goto out;
4499  } else {
4500  memb_set_merge (proc_list,
4501  memb_join->proc_list_entries,
4502  instance->my_proc_list, &instance->my_proc_list_entries);
4503 
4504  if (memb_set_subset (
4505  &instance->my_id, 1,
4506  failed_list, memb_join->failed_list_entries)) {
4507 
4508  memb_set_merge (
4509  &memb_join->system_from, 1,
4510  instance->my_failed_list, &instance->my_failed_list_entries);
4511  } else {
4512  if (memb_set_subset (
4513  &memb_join->system_from, 1,
4514  instance->my_memb_list,
4515  instance->my_memb_entries)) {
4516 
4517  if (memb_set_subset (
4518  &memb_join->system_from, 1,
4519  instance->my_failed_list,
4520  instance->my_failed_list_entries) == 0) {
4521 
4522  memb_set_merge (failed_list,
4523  memb_join->failed_list_entries,
4524  instance->my_failed_list, &instance->my_failed_list_entries);
4525  } else {
4526  memb_set_subtract (fail_minus_memb,
4527  &fail_minus_memb_entries,
4528  failed_list,
4529  memb_join->failed_list_entries,
4530  instance->my_memb_list,
4531  instance->my_memb_entries);
4532 
4533  memb_set_merge (fail_minus_memb,
4534  fail_minus_memb_entries,
4535  instance->my_failed_list,
4536  &instance->my_failed_list_entries);
4537  }
4538  }
4539  }
4540  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4541  gather_entered = 1;
4542  }
4543 
4544 out:
4545  if (gather_entered == 0 &&
4546  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4547 
4548  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4549  }
4550 }
4551 
4552 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4553 {
4554  int i;
4555  struct srp_addr *in_proc_list;
4556  struct srp_addr *in_failed_list;
4557  struct srp_addr *out_proc_list;
4558  struct srp_addr *out_failed_list;
4559 
4560  out->header.magic = TOTEM_MH_MAGIC;
4562  out->header.type = in->header.type;
4563  out->header.nodeid = swab32 (in->header.nodeid);
4564  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4567  out->ring_seq = swab64 (in->ring_seq);
4568 
4569  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4570  in_failed_list = in_proc_list + out->proc_list_entries;
4571  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4572  out_failed_list = out_proc_list + out->proc_list_entries;
4573 
4574  for (i = 0; i < out->proc_list_entries; i++) {
4575  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4576  }
4577  for (i = 0; i < out->failed_list_entries; i++) {
4578  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4579  }
4580 }
4581 
4582 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4583 {
4584  int i;
4585  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4586  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4587  struct memb_commit_token_memb_entry *in_memb_list;
4588  struct memb_commit_token_memb_entry *out_memb_list;
4589 
4590  out->header.magic = TOTEM_MH_MAGIC;
4592  out->header.type = in->header.type;
4593  out->header.nodeid = swab32 (in->header.nodeid);
4594  out->token_seq = swab32 (in->token_seq);
4595  out->ring_id.rep = swab32(in->ring_id.rep);
4596  out->ring_id.seq = swab64 (in->ring_id.seq);
4597  out->retrans_flg = swab32 (in->retrans_flg);
4598  out->memb_index = swab32 (in->memb_index);
4599  out->addr_entries = swab32 (in->addr_entries);
4600 
4601  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4602  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4603  for (i = 0; i < out->addr_entries; i++) {
4604  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4605 
4606  /*
4607  * Only convert the memb entry if it has been set
4608  */
4609  if (in_memb_list[i].ring_id.rep != 0) {
4610  out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4611 
4612  out_memb_list[i].ring_id.seq =
4613  swab64 (in_memb_list[i].ring_id.seq);
4614  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4615  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4616  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4617  }
4618  }
4619 }
4620 
4621 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4622 {
4623  int i;
4624 
4625  out->header.magic = TOTEM_MH_MAGIC;
4627  out->header.type = in->header.type;
4628  out->header.nodeid = swab32 (in->header.nodeid);
4629  out->seq = swab32 (in->seq);
4630  out->token_seq = swab32 (in->token_seq);
4631  out->aru = swab32 (in->aru);
4632  out->ring_id.rep = swab32(in->ring_id.rep);
4633  out->aru_addr = swab32(in->aru_addr);
4634  out->ring_id.seq = swab64 (in->ring_id.seq);
4635  out->fcc = swab32 (in->fcc);
4636  out->backlog = swab32 (in->backlog);
4637  out->retrans_flg = swab32 (in->retrans_flg);
4639  for (i = 0; i < out->rtr_list_entries; i++) {
4640  out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4641  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4642  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4643  }
4644 }
4645 
4646 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4647 {
4648  out->header.magic = TOTEM_MH_MAGIC;
4650  out->header.type = in->header.type;
4651  out->header.nodeid = swab32 (in->header.nodeid);
4653 
4654  out->seq = swab32 (in->seq);
4655  out->this_seqno = swab32 (in->this_seqno);
4656  out->ring_id.rep = swab32(in->ring_id.rep);
4657  out->ring_id.seq = swab64 (in->ring_id.seq);
4658  out->node_id = swab32 (in->node_id);
4659  out->guarantee = swab32 (in->guarantee);
4660  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4661 }
4662 
4663 static void memb_merge_detect_endian_convert (
4664  const struct memb_merge_detect *in,
4665  struct memb_merge_detect *out)
4666 {
4667  out->header.magic = TOTEM_MH_MAGIC;
4669  out->header.type = in->header.type;
4670  out->header.nodeid = swab32 (in->header.nodeid);
4671  out->ring_id.rep = swab32(in->ring_id.rep);
4672  out->ring_id.seq = swab64 (in->ring_id.seq);
4673  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4674 }
4675 
4676 static int ignore_join_under_operational (
4677  struct totemsrp_instance *instance,
4678  const struct memb_join *memb_join)
4679 {
4680  struct srp_addr *proc_list;
4681  struct srp_addr *failed_list;
4682  unsigned long long ring_seq;
4683 
4684  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4685  failed_list = proc_list + memb_join->proc_list_entries;
4686  ring_seq = memb_join->ring_seq;
4687 
4688  if (memb_set_subset (&instance->my_id, 1,
4689  failed_list, memb_join->failed_list_entries)) {
4690  return (1);
4691  }
4692 
4693  /*
4694  * In operational state, my_proc_list is exactly the same as
4695  * my_memb_list.
4696  */
4697  if ((memb_set_subset (&memb_join->system_from, 1,
4698  instance->my_memb_list, instance->my_memb_entries)) &&
4699  (ring_seq < instance->my_ring_id.seq)) {
4700  return (1);
4701  }
4702 
4703  return (0);
4704 }
4705 
4706 static int message_handler_memb_join (
4707  struct totemsrp_instance *instance,
4708  const void *msg,
4709  size_t msg_len,
4710  int endian_conversion_needed)
4711 {
4712  const struct memb_join *memb_join;
4713  struct memb_join *memb_join_convert = alloca (msg_len);
4714 
4715  if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4716  return (0);
4717  }
4718 
4719  if (endian_conversion_needed) {
4720  memb_join = memb_join_convert;
4721  memb_join_endian_convert (msg, memb_join_convert);
4722 
4723  } else {
4724  memb_join = msg;
4725  }
4726  /*
4727  * If the process paused because it wasn't scheduled in a timely
4728  * fashion, flush the join messages because they may be queued
4729  * entries
4730  */
4731  if (pause_flush (instance)) {
4732  return (0);
4733  }
4734 
4735  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4736  instance->token_ring_id_seq = memb_join->ring_seq;
4737  }
4738  switch (instance->memb_state) {
4740  if (!ignore_join_under_operational (instance, memb_join)) {
4741  memb_join_process (instance, memb_join);
4742  }
4743  break;
4744 
4745  case MEMB_STATE_GATHER:
4746  memb_join_process (instance, memb_join);
4747  break;
4748 
4749  case MEMB_STATE_COMMIT:
4750  if (memb_set_subset (&memb_join->system_from,
4751  1,
4752  instance->my_new_memb_list,
4753  instance->my_new_memb_entries) &&
4754 
4755  memb_join->ring_seq >= instance->my_ring_id.seq) {
4756 
4757  memb_join_process (instance, memb_join);
4758  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4759  }
4760  break;
4761 
4762  case MEMB_STATE_RECOVERY:
4763  if (memb_set_subset (&memb_join->system_from,
4764  1,
4765  instance->my_new_memb_list,
4766  instance->my_new_memb_entries) &&
4767 
4768  memb_join->ring_seq >= instance->my_ring_id.seq) {
4769 
4770  memb_join_process (instance, memb_join);
4771  memb_recovery_state_token_loss (instance);
4772  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4773  }
4774  break;
4775  }
4776  return (0);
4777 }
4778 
4779 static int message_handler_memb_commit_token (
4780  struct totemsrp_instance *instance,
4781  const void *msg,
4782  size_t msg_len,
4783  int endian_conversion_needed)
4784 {
4785  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4786  struct memb_commit_token *memb_commit_token;
4787  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4788  int sub_entries;
4789 
4790  struct srp_addr *addr;
4791 
4793  "got commit token");
4794 
4795  if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4796  return (0);
4797  }
4798 
4799  if (endian_conversion_needed) {
4800  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4801  } else {
4802  memcpy (memb_commit_token_convert, msg, msg_len);
4803  }
4804  memb_commit_token = memb_commit_token_convert;
4805  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4806 
4807 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4808  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4809  return (0);
4810  }
4811 #endif
4812  switch (instance->memb_state) {
4814  /* discard token */
4815  break;
4816 
4817  case MEMB_STATE_GATHER:
4818  memb_set_subtract (sub, &sub_entries,
4819  instance->my_proc_list, instance->my_proc_list_entries,
4820  instance->my_failed_list, instance->my_failed_list_entries);
4821 
4822  if (memb_set_equal (addr,
4823  memb_commit_token->addr_entries,
4824  sub,
4825  sub_entries) &&
4826 
4827  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4828  memcpy (instance->commit_token, memb_commit_token, msg_len);
4829  memb_state_commit_enter (instance);
4830  }
4831  break;
4832 
4833  case MEMB_STATE_COMMIT:
4834  /*
4835  * If retransmitted commit tokens are sent on this ring
4836  * filter them out and only enter recovery once the
4837  * commit token has traversed the array. This is
4838  * determined by :
4839  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4840  */
4841  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4842  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4843  memb_state_recovery_enter (instance, memb_commit_token);
4844  }
4845  break;
4846 
4847  case MEMB_STATE_RECOVERY:
4848  if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4849 
4850  /* Filter out duplicated tokens */
4851  if (instance->originated_orf_token) {
4852  break;
4853  }
4854 
4855  instance->originated_orf_token = 1;
4856 
4858  "Sending initial ORF token");
4859 
4860  // TODO convert instead of initiate
4861  orf_token_send_initial (instance);
4862  reset_token_timeout (instance); // REVIEWED
4863  reset_token_retransmit_timeout (instance); // REVIEWED
4864  }
4865  break;
4866  }
4867  return (0);
4868 }
4869 
4870 static int message_handler_token_hold_cancel (
4871  struct totemsrp_instance *instance,
4872  const void *msg,
4873  size_t msg_len,
4874  int endian_conversion_needed)
4875 {
4876  const struct token_hold_cancel *token_hold_cancel = msg;
4877 
4878  if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4879  return (0);
4880  }
4881 
4882  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4883  sizeof (struct memb_ring_id)) == 0) {
4884 
4885  instance->my_seq_unchanged = 0;
4886  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4887  timer_function_token_retransmit_timeout (instance);
4888  }
4889  }
4890  return (0);
4891 }
4892 
4893 static int check_message_header_validity(
4894  void *context,
4895  const void *msg,
4896  unsigned int msg_len,
4897  const struct sockaddr_storage *system_from)
4898 {
4899  struct totemsrp_instance *instance = context;
4900  const struct totem_message_header *message_header = msg;
4901  const char *guessed_str;
4902  const char *msg_byte = msg;
4903 
4904  if (msg_len < sizeof (struct totem_message_header)) {
4906  "Message received from %s is too short... Ignoring %u.",
4907  totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4908  return (-1);
4909  }
4910 
4911  if (message_header->magic != TOTEM_MH_MAGIC &&
4912  message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4913  /*
4914  * We've received ether Knet, old version of Corosync,
4915  * or something else. Do some guessing to display (hopefully)
4916  * helpful message
4917  */
4918  guessed_str = NULL;
4919 
4920  if (message_header->magic == 0xFFFF) {
4921  /*
4922  * Corosync 2.2 used header with two UINT8_MAX
4923  */
4924  guessed_str = "Corosync 2.2";
4925  } else if (message_header->magic == 0xFEFE) {
4926  /*
4927  * Corosync 2.3+ used header with two UINT8_MAX - 1
4928  */
4929  guessed_str = "Corosync 2.3+";
4930  } else if (msg_byte[0] == 0x01) {
4931  /*
4932  * Knet has stable1 with first byte of message == 1
4933  */
4934  guessed_str = "unencrypted Kronosnet";
4935  } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4936  /*
4937  * Unencrypted Corosync 1.x/OpenAIS has first byte
4938  * 0-5. Collision with Knet (but still worth the try)
4939  */
4940  guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4941  } else {
4942  /*
4943  * Encrypted Kronosned packet has a hash at the end of
4944  * the packet and nothing specific at the beginning of the
4945  * packet (just encrypted data).
4946  * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
4947  * is in the beginning of the packet.
4948  *
4949  * So it's not possible to reliably detect ether of them.
4950  */
4951  guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4952  }
4953 
4955  "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4956  totemip_sa_print((struct sockaddr *)system_from),
4957  guessed_str);
4958 
4959  return (-1);
4960  }
4961 
4962  if (message_header->version != TOTEM_MH_VERSION) {
4964  "Message received from %s has unsupported version %u... Ignoring",
4965  totemip_sa_print((struct sockaddr *)system_from),
4966  message_header->version);
4967 
4968  return (-1);
4969  }
4970 
4971  return (0);
4972 }
4973 
4974 
4976  void *context,
4977  const void *msg,
4978  unsigned int msg_len,
4979  const struct sockaddr_storage *system_from)
4980 {
4981  struct totemsrp_instance *instance = context;
4982  const struct totem_message_header *message_header = msg;
4983 
4984  if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
4985  return ;
4986  }
4987 
4988  switch (message_header->type) {
4990  instance->stats.orf_token_rx++;
4991  break;
4992  case MESSAGE_TYPE_MCAST:
4993  instance->stats.mcast_rx++;
4994  break;
4996  instance->stats.memb_merge_detect_rx++;
4997  break;
4999  instance->stats.memb_join_rx++;
5000  break;
5002  instance->stats.memb_commit_token_rx++;
5003  break;
5005  instance->stats.token_hold_cancel_rx++;
5006  break;
5007  default:
5009  "Message received from %s has wrong type... ignoring %d.\n",
5010  totemip_sa_print((struct sockaddr *)system_from),
5011  (int)message_header->type);
5012 
5013  instance->stats.rx_msg_dropped++;
5014  return;
5015  }
5016  /*
5017  * Handle incoming message
5018  */
5019  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5020  instance,
5021  msg,
5022  msg_len,
5023  message_header->magic != TOTEM_MH_MAGIC);
5024 }
5025 
5027  void *context,
5028  const struct totem_ip_address *interface_addr,
5029  unsigned short ip_port,
5030  unsigned int iface_no)
5031 {
5032  struct totemsrp_instance *instance = context;
5033  int res;
5034 
5035  totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5036 
5037  res = totemnet_iface_set (
5038  instance->totemnet_context,
5039  interface_addr,
5040  ip_port,
5041  iface_no);
5042 
5043  return (res);
5044 }
5045 
5046 
5048  void *context,
5049  const struct totem_ip_address *iface_addr,
5050  unsigned int iface_no)
5051 {
5052  struct totemsrp_instance *instance = context;
5053  int num_interfaces;
5054  int i;
5055 
5056  if (!instance->my_id.nodeid) {
5057  instance->my_id.nodeid = iface_addr->nodeid;
5058  }
5059  totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5060 
5061  if (instance->iface_changes++ == 0) {
5062  instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5063  instance->token_ring_id_seq = instance->my_ring_id.seq;
5064  log_printf (
5065  instance->totemsrp_log_level_debug,
5066  "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5067  instance->my_ring_id.rep,
5068  (uint64_t)instance->my_ring_id.seq);
5069 
5070  if (instance->totemsrp_service_ready_fn) {
5071  instance->totemsrp_service_ready_fn ();
5072  }
5073 
5074  }
5075 
5076  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5077  totemsrp_member_add (instance,
5078  &instance->totem_config->interfaces[iface_no].member_list[i],
5079  iface_no);
5080  }
5081 
5082  num_interfaces = 0;
5083  for (i = 0; i < INTERFACE_MAX; i++) {
5084  if (instance->totem_config->interfaces[i].configured) {
5085  num_interfaces++;
5086  }
5087  }
5088 
5089  if (instance->iface_changes >= num_interfaces) {
5090  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5091  }
5092 }
5093 
5094 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
5095  totem_config->net_mtu -= 2 * sizeof (struct mcast);
5096 }
5097 
5099  void *context,
5100  void (*totem_service_ready) (void))
5101 {
5102  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5103 
5104  instance->totemsrp_service_ready_fn = totem_service_ready;
5105 }
5106 
5108  void *context,
5109  const struct totem_ip_address *member,
5110  int iface_no)
5111 {
5112  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5113  int res;
5114 
5115  res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5116 
5117  return (res);
5118 }
5119 
5121  void *context,
5122  const struct totem_ip_address *member,
5123  int iface_no)
5124 {
5125  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5126  int res;
5127 
5128  res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5129 
5130  return (res);
5131 }
5132 
5133 void totemsrp_threaded_mode_enable (void *context)
5134 {
5135  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5136 
5137  instance->threaded_mode_enabled = 1;
5138 }
5139 
5140 void totemsrp_trans_ack (void *context)
5141 {
5142  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5143 
5144  instance->waiting_trans_ack = 0;
5145  instance->totemsrp_waiting_trans_ack_cb_fn (0);
5146 }
5147 
5148 
5149 int totemsrp_reconfigure (void *context, struct totem_config *totem_config)
5150 {
5151  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5152  int res;
5153 
5154  res = totemnet_reconfigure (instance->totemnet_context, totem_config);
5155  return (res);
5156 }
5157 
5158 void totemsrp_stats_clear (void *context, int flags)
5159 {
5160  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5161 
5162  memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5163  if (flags & TOTEMPG_STATS_CLEAR_TRANSPORT) {
5165  }
5166 }
5167 
5168 void totemsrp_force_gather (void *context)
5169 {
5170  timer_function_orf_token_timeout(context);
5171 }
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:464
unsigned int backlog
Definition: totemsrp.c:204
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:451
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:443
unsigned short family
Definition: coroapi.h:113
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5149
struct totem_message_header header
Definition: totemsrp.c:181
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5026
gather_state_from
Definition: totemsrp.c:539
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5047
struct srp_addr system_from
Definition: totemsrp.c:214
unsigned int nodeid
Definition: totem.h:129
struct memb_ring_id ring_id
Definition: totemsrp.c:192
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:410
uint32_t waiting_trans_ack
Definition: totemsrp.c:521
struct srp_addr system_from
Definition: totemsrp.c:182
struct memb_ring_id ring_id
Definition: totemsrp.c:251
int totemsrp_log_level_debug
Definition: totemsrp.c:429
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totemstats.h:65
qb_loop_timer_handle timer_orf_token_warning
Definition: totemsrp.c:402
int my_leave_memb_entries
Definition: totemsrp.c:335
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:491
unsigned int proc_list_entries
Definition: totemsrp.c:215
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:158
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1112
struct qb_list_head list
Definition: totemsrp.c:167
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:469
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:457
The totem_ip_address struct.
Definition: coroapi.h:111
unsigned int seq
Definition: totemsrp.c:262
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totemstats.h:90
int totemsrp_log_level_error
Definition: totemsrp.c:423
int old_ring_state_aru
Definition: totemsrp.c:489
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:99
unsigned int seq
Definition: totemsrp.c:199
struct memb_ring_id ring_id
Definition: totemsrp.c:241
int fcc_remcast_current
Definition: totemsrp.c:293
#define TOTEM_MH_MAGIC
Definition: totem.h:121
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:416
unsigned int failed_list_entries
Definition: totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:265
uint64_t mcast_rx
Definition: totemstats.h:63
unsigned long long int tv_old
Definition: totemsrp.c:3798
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5107
#define SEQNO_START_TOKEN
Definition: totemsrp.c:119
void totemnet_stats_clear(void *net_context)
Definition: totemnet.c:574
unsigned int token_hold_timeout
Definition: totem.h:180
unsigned int msg_len
Definition: totemsrp.c:266
int member_count
Definition: totem.h:88
struct memb_ring_id ring_id
Definition: totemsrp.c:203
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:95
void * token_sent_event_handle
Definition: totemsrp.c:526
int retrans_flg
Definition: totemsrp.c:206
struct srp_addr system_from
Definition: totemsrp.c:229
int my_new_memb_entries
Definition: totemsrp.c:325
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
int addr_entries
Definition: totemsrp.c:265
int totemsrp_log_level_notice
Definition: totemsrp.c:427
unsigned int proc_list_entries
Definition: totemsrp.c:262
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1101
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:238
unsigned int my_pbl
Definition: totemsrp.c:505
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:524
#define TOTEM_TOKEN_STATS_MAX
Definition: totemstats.h:89
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5094
int totemsrp_log_level_warning
Definition: totemsrp.c:425
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1087
unsigned int my_aru
Definition: totemsrp.c:381
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5120
uint64_t memb_merge_detect_rx
Definition: totemstats.h:58
int guarantee
Definition: totemsrp.c:266
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:255
unsigned int seq
Definition: totemsrp.c:183
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
char commit_token_storage[40000]
Definition: totemsrp.c:527
The sq struct.
Definition: sq.h:43
unsigned int set_aru
Definition: totemsrp.c:485
struct cs_queue new_message_queue
Definition: totemsrp.c:368
int my_rotation_counter
Definition: totemsrp.c:355
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
uint64_t orf_token_tx
Definition: totemstats.h:55
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3491
uint64_t gather_token_lost
Definition: totemstats.h:71
#define CS_PRI_RING_ID_SEQ
Definition: corotypes.h:60
int totemsrp_log_level_trace
Definition: totemsrp.c:431
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:123
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
memb_state
Definition: totemsrp.c:274
unsigned int downcheck_timeout
Definition: totem.h:192
struct qb_list_head token_callback_received_listhead
Definition: totemsrp.c:385
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:98
uint64_t memb_commit_token_tx
Definition: totemstats.h:64
int my_deliver_memb_entries
Definition: totemsrp.c:331
unsigned int max_network_delay
Definition: totem.h:208
unsigned int heartbeat_failures_allowed
Definition: totem.h:206
unsigned int my_last_seq
Definition: totemsrp.c:493
int my_left_memb_entries
Definition: totemsrp.c:333
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
struct message_item __attribute__
unsigned long long token_ring_id_seq
Definition: totemsrp.c:481
struct totem_ip_address mcast_address
Definition: totemsrp.c:449
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:435
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3456
unsigned int send_join_timeout
Definition: totem.h:186
unsigned int window_size
Definition: totem.h:210
int guarantee
Definition: totemsrp.c:187
unsigned int seq
Definition: totemsrp.c:193
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5098
struct mcast * mcast
Definition: totemsrp.c:270
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
uint64_t operational_entered
Definition: totemstats.h:68
void(*) in log_level_security)
Definition: totem.h:106
unsigned long long ring_seq
Definition: totemsrp.c:217
#define INTERFACE_MAX
Definition: coroapi.h:88
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2458
message_type
Definition: totemsrp.c:143
uint64_t operational_token_lost
Definition: totemstats.h:69
unsigned int received_flg
Definition: totemsrp.c:263
uint64_t consensus_timeouts
Definition: totemstats.h:76
unsigned int aru_addr
Definition: totemsrp.c:202
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:383
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:677
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:412
uint64_t recovery_token_lost
Definition: totemstats.h:75
int totemnet_recv_flush(void *net_context)
Definition: totemnet.c:378
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition: totemnet.c:481
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemnet.c:468
unsigned int backlog
Definition: totemsrp.c:266
int this_seqno
Definition: totemsrp.c:184
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:218
unsigned int token_retransmits_before_loss_const
Definition: totem.h:182
uint8_t configured
Definition: totem.h:87
totemsrp_stats_t * srp
Definition: totemstats.h:96
struct rtr_item rtr_list[0]
Definition: totemsrp.c:270
unsigned int retrans_flg
Definition: totemsrp.c:252
struct memb_ring_id ring_id
Definition: totemsrp.c:185
unsigned int seqno_unchanged_const
Definition: totem.h:196
uint64_t commit_token_lost
Definition: totemstats.h:73
unsigned int miss_count_const
Definition: totem.h:232
uint64_t token_hold_cancel_rx
Definition: totemstats.h:67
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition: totemnet.c:276
unsigned int join_timeout
Definition: totem.h:184
unsigned int aru
Definition: totemsrp.c:242
uint32_t originated_orf_token
Definition: totemsrp.c:517
unsigned int rep
Definition: totem.h:148
void * totemnet_buffer_alloc(void *net_context)
Definition: totemnet.c:351
unsigned int nodeid
Definition: coroapi.h:112
uint32_t flags
uint64_t pause_timestamp
Definition: totemsrp.c:509
int my_set_retrans_flg
Definition: totemsrp.c:357
struct totem_ip_address mcast_addr
Definition: totem.h:84
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
int totemnet_recv_mcast_empty(void *net_context)
Definition: totemnet.c:493
unsigned int received_flg
Definition: totemsrp.c:244
unsigned int my_cbl
Definition: totemsrp.c:507
unsigned int last_released
Definition: totemsrp.c:483
int orf_token_retransmit_size
Definition: totemsrp.c:391
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2529
uint64_t mcast_retx
Definition: totemstats.h:62
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition: totemnet.c:301
unsigned int msg_len
Definition: totemsrp.c:271
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:94
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
unsigned int fail_to_recv_const
Definition: totem.h:194
int totemnet_send_flush(void *net_context)
Definition: totemnet.c:388
#define TOTEM_MH_VERSION
Definition: totem.h:122
unsigned int token_seq
Definition: totemsrp.c:200
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5158
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:423
struct mcast * mcast
Definition: totemsrp.c:265
void * token_recv_event_handle
Definition: totemsrp.c:525
struct totem_ip_address boundto
Definition: totem.h:83
unsigned int my_high_seq_received
Definition: totemsrp.c:351
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:447
int totemnet_finalize(void *net_context)
Definition: totemnet.c:290
totem_event_type
Definition: totem.h:259
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:408
int old_ring_state_saved
Definition: totemsrp.c:487
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
uint64_t rx_msg_dropped
Definition: totemstats.h:77
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
int totemsrp_log_level_security
Definition: totemsrp.c:421
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:404
struct totem_config * totem_config
Definition: totemsrp.c:499
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:168
struct totem_message_header header
Definition: totemsrp.c:260
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
uint32_t continuous_gather
Definition: totemstats.h:78
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemsrp.c:4975
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5133
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:398
unsigned int aru
Definition: totemsrp.c:263
encapsulation_type
Definition: totemsrp.c:152
unsigned int net_mtu
Definition: totem.h:202
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1049
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:818
struct totem_message_header header
Definition: totemsrp.c:249
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2449
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:242
unsigned int node_id
Definition: totemsrp.c:186
uint32_t orf_token_discard
Definition: totemsrp.c:515
int my_failed_list_entries
Definition: totemsrp.c:323
struct srp_addr my_id
Definition: totemsrp.c:301
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
uint64_t token_hold_cancel_tx
Definition: totemstats.h:66
const char * totemip_sa_print(const struct sockaddr *sa)
Definition: totemip.c:242
unsigned int token_timeout
Definition: totem.h:174
Definition: totemsrp.c:240
struct totem_message_header header
Definition: totemsrp.c:198
unsigned int high_delivered
Definition: totemsrp.c:243
unsigned int consensus_timeout
Definition: totem.h:188
totemsrp_stats_t stats
Definition: totemsrp.c:513
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5168
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition: totemsrp.c:303
uint64_t mcast_tx
Definition: totemstats.h:61
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
unsigned int token_warning
Definition: totem.h:176
struct sq regular_sort_queue
Definition: totemsrp.c:374
int my_retrans_flg_count
Definition: totemsrp.c:359
The memb_ring_id struct.
Definition: coroapi.h:122
#define SEQNO_START_MSG
Definition: totemsrp.c:118
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
struct totem_message_header header
Definition: totemsrp.c:235
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1026
struct totem_message_header header
Definition: totemsrp.c:228
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:93
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
unsigned int aru
Definition: totemsrp.c:201
const char * gather_state_from_desc[]
Definition: totemsrp.c:559
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:410
int my_trans_memb_entries
Definition: totemsrp.c:327
unsigned int my_trc
Definition: totemsrp.c:503
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:466
uint64_t memb_merge_detect_tx
Definition: totemstats.h:57
unsigned int high_delivered
Definition: totemsrp.c:262
#define CS_PRI_RING_ID
Definition: corotypes.h:61
struct rtr_item rtr_list[0]
Definition: totemsrp.c:208
int consensus_list_entries
Definition: totemsrp.c:297
uint64_t memb_join_rx
Definition: totemstats.h:60
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:504
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:266
#define FRAME_SIZE_MAX
Definition: totem.h:52
int rtr_list_entries
Definition: totemsrp.c:207
uint32_t threaded_mode_enabled
Definition: totemsrp.c:519
enum totem_callback_token_type callback_type
Definition: totemsrp.c:169
int my_proc_list_entries
Definition: totemsrp.c:321
unsigned long long ring_seq
Definition: totemsrp.c:264
struct totem_message_header header
Definition: totemsrp.c:213
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200
struct memb_ring_id ring_id
Definition: totemsrp.c:236
#define log_printf(level, format, args...)
Definition: totemsrp.c:689
unsigned long long seq
Definition: coroapi.h:124
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5140
unsigned int max_messages
Definition: totem.h:212
uint64_t recovery_entered
Definition: totemstats.h:74
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:414
void totemnet_buffer_release(void *net_context, void *ptr)
Definition: totemnet.c:359
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition: totemnet.c:560
struct memb_commit_token * commit_token
Definition: totemsrp.c:511
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:295
struct srp_addr system_from
Definition: totemsrp.c:261
struct srp_addr addr
Definition: totemsrp.c:161
char type
Definition: totem.h:55
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:473
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:532
int totemsrp_subsys_id
Definition: totemsrp.c:433
unsigned int merge_timeout
Definition: totem.h:190
unsigned int use_heartbeat
Definition: totemsrp.c:501
int totemnet_iface_check(void *net_context)
Definition: totemnet.c:436
unsigned int token_retransmit_timeout
Definition: totem.h:178
struct qb_list_head token_callback_sent_listhead
Definition: totemsrp.c:387
int rtr_list_entries
Definition: totemsrp.c:269
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:97
unsigned short magic
Definition: totem.h:125
unsigned int token_seq
Definition: totemsrp.c:250
unsigned int my_token_seq
Definition: totemsrp.c:393
struct memb_ring_id ring_id
Definition: totemsrp.c:264
unsigned int my_last_aru
Definition: totemsrp.c:345
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
unsigned int nodeid
Definition: totemsrp.c:105
void * totemnet_context
Definition: totemsrp.c:497
uint64_t commit_entered
Definition: totemstats.h:72
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:406
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemnet.c:455
struct memb_ring_id ring_id
Definition: totemsrp.c:230
unsigned int my_install_seq
Definition: totemsrp.c:353
uint64_t orf_token_rx
Definition: totemstats.h:56
unsigned int threads
Definition: totem.h:204
unsigned int failed_list_entries
Definition: totemsrp.c:263
struct sq recovery_sort_queue
Definition: totemsrp.c:376
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition: totemstats.h:116
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition: totemnet.c:367
unsigned int fcc
Definition: totemsrp.c:205