corosync  3.0.3
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2018 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46  */
47 
48 #include <config.h>
49 
50 #include <assert.h>
51 #ifdef HAVE_ALLOCA_H
52 #include <alloca.h>
53 #endif
54 #include <sys/mman.h>
55 #include <sys/types.h>
56 #include <sys/stat.h>
57 #include <sys/socket.h>
58 #include <netdb.h>
59 #include <sys/un.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
64 #include <unistd.h>
65 #include <fcntl.h>
66 #include <stdlib.h>
67 #include <stdio.h>
68 #include <errno.h>
69 #include <sched.h>
70 #include <time.h>
71 #include <sys/time.h>
72 #include <sys/poll.h>
73 #include <sys/uio.h>
74 #include <limits.h>
75 
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
80 
81 #include <corosync/swab.h>
82 #include <corosync/sq.h>
83 
84 #define LOGSYS_UTILS_ONLY 1
85 #include <corosync/logsys.h>
86 
87 #include "totemsrp.h"
88 #include "totemnet.h"
89 
90 #include "cs_queue.h"
91 
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
96 #define MAXIOVS 5
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000 /* bytes */
99 #define LEAVE_DUMMY_NODEID 0
100 
101 /*
102  * SRP address.
103  */
104 struct srp_addr {
105  unsigned int nodeid;
106 };
107 
108 /*
109  * Rollover handling:
110  * SEQNO_START_MSG is the starting sequence number after a new configuration
111  * This should remain zero, unless testing overflow in which case
112  * 0x7ffff000 and 0xfffff000 are good starting values.
113  *
114  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
115  * for a token. This should remain zero, unless testing overflow in which
116  * case 07fffff00 or 0xffffff00 are good starting values.
117  */
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
120 
121 /*
122  * These can be used ot test different rollover points
123  * #define SEQNO_START_MSG 0xfffffe00
124  * #define SEQNO_START_TOKEN 0xfffffe00
125  */
126 
127 /*
128  * These can be used to test the error recovery algorithms
129  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
130  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
131  * #define TEST_DROP_MCAST_PERCENTAGE 50
132  * #define TEST_RECOVERY_MSG_COUNT 300
133  */
134 
135 /*
136  * we compare incoming messages to determine if their endian is
137  * different - if so convert them
138  *
139  * do not change
140  */
141 #define ENDIAN_LOCAL 0xff22
142 
144  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
145  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
146  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
147  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
148  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
149  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
150 };
151 
155 };
156 
157 /*
158  * New membership algorithm local variables
159  */
161  struct srp_addr addr;
162  int set;
163 };
164 
165 
167  struct qb_list_head list;
168  int (*callback_fn) (enum totem_callback_token_type type, const void *);
170  int delete;
171  void *data;
172 };
173 
174 
176  int mcast;
177  int token;
178 };
179 
180 struct mcast {
183  unsigned int seq;
186  unsigned int node_id;
188 } __attribute__((packed));
189 
190 
191 struct rtr_item {
193  unsigned int seq;
194 }__attribute__((packed));
195 
196 
197 struct orf_token {
199  unsigned int seq;
200  unsigned int token_seq;
201  unsigned int aru;
202  unsigned int aru_addr;
204  unsigned int backlog;
205  unsigned int fcc;
208  struct rtr_item rtr_list[0];
209 }__attribute__((packed));
210 
211 
212 struct memb_join {
215  unsigned int proc_list_entries;
216  unsigned int failed_list_entries;
217  unsigned long long ring_seq;
218  unsigned char end_of_memb_join[0];
219 /*
220  * These parts of the data structure are dynamic:
221  * struct srp_addr proc_list[];
222  * struct srp_addr failed_list[];
223  */
224 } __attribute__((packed));
225 
226 
231 } __attribute__((packed));
232 
233 
237 } __attribute__((packed));
238 
239 
242  unsigned int aru;
243  unsigned int high_delivered;
244  unsigned int received_flg;
245 }__attribute__((packed));
246 
247 
250  unsigned int token_seq;
252  unsigned int retrans_flg;
255  unsigned char end_of_commit_token[0];
256 /*
257  * These parts of the data structure are dynamic:
258  *
259  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
260  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
261  */
262 }__attribute__((packed));
263 
264 struct message_item {
265  struct mcast *mcast;
266  unsigned int msg_len;
267 };
268 
270  struct mcast *mcast;
271  unsigned int msg_len;
272 };
273 
279 };
280 
283 
285 
286  /*
287  * Flow control mcasts and remcasts on last and current orf_token
288  */
290 
292 
294 
296 
298 
300 
301  struct srp_addr my_id;
302 
304 
306 
308 
310 
312 
314 
316 
318 
320 
322 
324 
326 
328 
330 
332 
334 
336 
338 
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
369 
371 
373 
375 
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
385  struct qb_list_head token_callback_received_listhead;
386 
387  struct qb_list_head token_callback_sent_listhead;
388 
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
402  qb_loop_timer_handle timer_orf_token_warning;
403 
405 
407 
408  qb_loop_timer_handle timer_merge_detect_timeout;
409 
411 
413 
414  qb_loop_timer_handle memb_timer_state_commit_timeout;
415 
416  qb_loop_timer_handle timer_heartbeat_timeout;
417 
418  /*
419  * Function and data used to log messages
420  */
422 
424 
426 
428 
430 
432 
434 
436  int level,
437  int subsys,
438  const char *function,
439  const char *file,
440  int line,
441  const char *format, ...)__attribute__((format(printf, 6, 7)));;
442 
444 
445 //TODO struct srp_addr next_memb;
446 
448 
450 
452  unsigned int nodeid,
453  const void *msg,
454  unsigned int msg_len,
455  int endian_conversion_required);
456 
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id);
463 
465 
467  int waiting_trans_ack);
468 
470  struct memb_ring_id *memb_ring_id,
471  unsigned int nodeid);
472 
474  const struct memb_ring_id *memb_ring_id,
475  unsigned int nodeid);
476 
478 
480 
481  unsigned long long token_ring_id_seq;
482 
483  unsigned int last_released;
484 
485  unsigned int set_aru;
486 
488 
490 
492 
493  unsigned int my_last_seq;
494 
495  struct timeval tv_old;
496 
498 
500 
501  unsigned int use_heartbeat;
502 
503  unsigned int my_trc;
504 
505  unsigned int my_pbl;
506 
507  unsigned int my_cbl;
508 
509  uint64_t pause_timestamp;
510 
512 
514 
516 
518 
520 
522 
523  int flushing;
524 
527  char commit_token_storage[40000];
528 };
529 
531  int count;
532  int (*handler_functions[6]) (
533  struct totemsrp_instance *instance,
534  const void *msg,
535  size_t msg_len,
536  int endian_conversion_needed);
537 };
538 
557 };
558 
559 const char* gather_state_from_desc [] = {
560  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
561  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
563  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
564  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
565  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
566  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
567  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
568  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
570  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
571  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
572  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
573  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
574  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
575  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
576 };
577 
578 /*
579  * forward decls
580  */
581 static int message_handler_orf_token (
582  struct totemsrp_instance *instance,
583  const void *msg,
584  size_t msg_len,
585  int endian_conversion_needed);
586 
587 static int message_handler_mcast (
588  struct totemsrp_instance *instance,
589  const void *msg,
590  size_t msg_len,
591  int endian_conversion_needed);
592 
593 static int message_handler_memb_merge_detect (
594  struct totemsrp_instance *instance,
595  const void *msg,
596  size_t msg_len,
597  int endian_conversion_needed);
598 
599 static int message_handler_memb_join (
600  struct totemsrp_instance *instance,
601  const void *msg,
602  size_t msg_len,
603  int endian_conversion_needed);
604 
605 static int message_handler_memb_commit_token (
606  struct totemsrp_instance *instance,
607  const void *msg,
608  size_t msg_len,
609  int endian_conversion_needed);
610 
611 static int message_handler_token_hold_cancel (
612  struct totemsrp_instance *instance,
613  const void *msg,
614  size_t msg_len,
615  int endian_conversion_needed);
616 
617 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
618 
619 static void srp_addr_to_nodeid (
620  struct totemsrp_instance *instance,
621  unsigned int *nodeid_out,
622  struct srp_addr *srp_addr_in,
623  unsigned int entries);
624 
625 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
626 
627 static void memb_leave_message_send (struct totemsrp_instance *instance);
628 
629 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
630 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
631 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
632 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
633  int fcc_mcasts_allowed);
634 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
635 
636 static void memb_ring_id_set (struct totemsrp_instance *instance,
637  const struct memb_ring_id *ring_id);
638 static void target_set_completed (void *context);
639 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
640 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
641 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
642 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
643 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
644 static int token_hold_cancel_send (struct totemsrp_instance *instance);
645 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
646 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
647 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
648 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
649 static void memb_merge_detect_endian_convert (
650  const struct memb_merge_detect *in,
651  struct memb_merge_detect *out);
652 static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
653 static void timer_function_orf_token_timeout (void *data);
654 static void timer_function_orf_token_warning (void *data);
655 static void timer_function_pause_timeout (void *data);
656 static void timer_function_heartbeat_timeout (void *data);
657 static void timer_function_token_retransmit_timeout (void *data);
658 static void timer_function_token_hold_retransmit_timeout (void *data);
659 static void timer_function_merge_detect_timeout (void *data);
660 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
661 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
662 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
663 
664 void main_deliver_fn (
665  void *context,
666  const void *msg,
667  unsigned int msg_len,
668  const struct sockaddr_storage *system_from);
669 
671  void *context,
672  const struct totem_ip_address *iface_address,
673  unsigned int iface_no);
674 
676  6,
677  {
678  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
679  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
680  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
681  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
682  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
683  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
684  }
685 };
686 
687 #define log_printf(level, format, args...) \
688 do { \
689  instance->totemsrp_log_printf ( \
690  level, instance->totemsrp_subsys_id, \
691  __FUNCTION__, __FILE__, __LINE__, \
692  format, ##args); \
693 } while (0);
694 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
695 do { \
696  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
697  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
698  instance->totemsrp_log_printf ( \
699  level, instance->totemsrp_subsys_id, \
700  __FUNCTION__, __FILE__, __LINE__, \
701  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
702  } while(0)
703 
704 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
705 {
706  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
707  return gather_state_from_desc[gsfrom];
708  }
709  else {
710  return "UNKNOWN";
711  }
712 }
713 
714 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
715 {
716  memset (instance, 0, sizeof (struct totemsrp_instance));
717 
718  qb_list_init (&instance->token_callback_received_listhead);
719 
720  qb_list_init (&instance->token_callback_sent_listhead);
721 
722  instance->my_received_flg = 1;
723 
724  instance->my_token_seq = SEQNO_START_TOKEN - 1;
725 
727 
728  instance->set_aru = -1;
729 
730  instance->my_aru = SEQNO_START_MSG;
731 
733 
735 
736  instance->orf_token_discard = 0;
737 
738  instance->originated_orf_token = 0;
739 
740  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
741 
742  instance->waiting_trans_ack = 1;
743 }
744 
745 static int pause_flush (struct totemsrp_instance *instance)
746 {
747  uint64_t now_msec;
748  uint64_t timestamp_msec;
749  int res = 0;
750 
751  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
752  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
753 
754  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
756  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
757  /*
758  * -1 indicates an error from recvmsg
759  */
760  do {
762  } while (res == -1);
763  }
764  return (res);
765 }
766 
767 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
768 {
769  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
770  uint32_t time_now;
771  unsigned long long nano_secs = qb_util_nano_current_get ();
772 
773  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
774 
776  /* incr latest token the index */
777  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
778  instance->stats.latest_token = 0;
779  else
780  instance->stats.latest_token++;
781 
782  if (instance->stats.earliest_token == instance->stats.latest_token) {
783  /* we have filled up the array, start overwriting */
784  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
785  instance->stats.earliest_token = 0;
786  else
787  instance->stats.earliest_token++;
788 
789  instance->stats.token[instance->stats.earliest_token].rx = 0;
790  instance->stats.token[instance->stats.earliest_token].tx = 0;
791  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
792  }
793 
794  instance->stats.token[instance->stats.latest_token].rx = time_now;
795  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
796  } else {
797  instance->stats.token[instance->stats.latest_token].tx = time_now;
798  }
799  return 0;
800 }
801 
802 static void totempg_mtu_changed(void *context, int net_mtu)
803 {
804  struct totemsrp_instance *instance = context;
805 
806  instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
807 
809  "Net MTU changed to %d, new value is %d",
810  net_mtu, instance->totem_config->net_mtu);
811 }
812 
813 /*
814  * Exported interfaces
815  */
817  qb_loop_t *poll_handle,
818  void **srp_context,
819  struct totem_config *totem_config,
820  totempg_stats_t *stats,
821 
822  void (*deliver_fn) (
823  unsigned int nodeid,
824  const void *msg,
825  unsigned int msg_len,
826  int endian_conversion_required),
827 
828  void (*confchg_fn) (
829  enum totem_configuration_type configuration_type,
830  const unsigned int *member_list, size_t member_list_entries,
831  const unsigned int *left_list, size_t left_list_entries,
832  const unsigned int *joined_list, size_t joined_list_entries,
833  const struct memb_ring_id *ring_id),
834  void (*waiting_trans_ack_cb_fn) (
835  int waiting_trans_ack))
836 {
837  struct totemsrp_instance *instance;
838  int res;
839 
840  instance = malloc (sizeof (struct totemsrp_instance));
841  if (instance == NULL) {
842  goto error_exit;
843  }
844 
845  totemsrp_instance_initialize (instance);
846 
847  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
848  instance->totemsrp_waiting_trans_ack_cb_fn (1);
849 
850  stats->srp = &instance->stats;
851  instance->stats.latest_token = 0;
852  instance->stats.earliest_token = 0;
853 
854  instance->totem_config = totem_config;
855 
856  /*
857  * Configure logging
858  */
867 
868  /*
869  * Configure totem store and load functions
870  */
873 
874  /*
875  * Initialize local variables for totemsrp
876  */
878 
879  /*
880  * Display totem configuration
881  */
883  "Token Timeout (%d ms) retransmit timeout (%d ms)",
886  uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
888  "Token warning every %d ms (%d%% of Token Timeout)",
889  token_warning_ms, totem_config->token_warning);
890  if (token_warning_ms < totem_config->token_retransmit_timeout)
892  "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
893  "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
894  token_warning_ms, totem_config->token_retransmit_timeout);
895  } else {
897  "Token warnings disabled");
898  }
900  "token hold (%d ms) retransmits before loss (%d retrans)",
903  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
907 
910  "downcheck (%d ms) fail to recv const (%d msgs)",
913  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
914 
916  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
918 
920  "missed count const (%d messages)",
922 
924  "send threads (%d threads)", totem_config->threads);
925 
927  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
929  "max_network_delay (%d ms)", totem_config->max_network_delay);
930 
931 
932  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
933  sizeof (struct message_item), instance->threaded_mode_enabled);
934 
935  sq_init (&instance->regular_sort_queue,
936  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
937 
938  sq_init (&instance->recovery_sort_queue,
939  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
940 
941  instance->totemsrp_poll_handle = poll_handle;
942 
943  instance->totemsrp_deliver_fn = deliver_fn;
944 
945  instance->totemsrp_confchg_fn = confchg_fn;
946  instance->use_heartbeat = 1;
947 
948  timer_function_pause_timeout (instance);
949 
952  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
953  instance->use_heartbeat = 0;
954  }
955 
956  if (instance->use_heartbeat) {
957  instance->heartbeat_timeout
960 
961  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
963  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
964  instance->heartbeat_timeout,
967  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
969  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
970  instance->use_heartbeat = 0;
971  }
972  else {
974  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
975  }
976  }
977 
978  res = totemnet_initialize (
979  poll_handle,
980  &instance->totemnet_context,
981  totem_config,
982  stats->srp,
983  instance,
986  totempg_mtu_changed,
987  target_set_completed);
988  if (res == -1) {
989  goto error_exit;
990  }
991 
992  instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
993 
994  /*
995  * Must have net_mtu adjusted by totemnet_initialize first
996  */
997  cs_queue_init (&instance->new_message_queue,
999  sizeof (struct message_item), instance->threaded_mode_enabled);
1000 
1001  cs_queue_init (&instance->new_message_queue_trans,
1003  sizeof (struct message_item), instance->threaded_mode_enabled);
1004 
1006  &instance->token_recv_event_handle,
1008  0,
1009  token_event_stats_collector,
1010  instance);
1012  &instance->token_sent_event_handle,
1014  0,
1015  token_event_stats_collector,
1016  instance);
1017  *srp_context = instance;
1018  return (0);
1019 
1020 error_exit:
1021  return (-1);
1022 }
1023 
1025  void *srp_context)
1026 {
1027  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1028 
1029  memb_leave_message_send (instance);
1030  totemnet_finalize (instance->totemnet_context);
1031  cs_queue_free (&instance->new_message_queue);
1032  cs_queue_free (&instance->new_message_queue_trans);
1033  cs_queue_free (&instance->retrans_message_queue);
1034  sq_free (&instance->regular_sort_queue);
1035  sq_free (&instance->recovery_sort_queue);
1036  free (instance);
1037 }
1038 
1039 /*
1040  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1041  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1042  * function.
1043  *
1044  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1045  * and if interface was not found, -1 is returned.
1046  */
1048  void *srp_context,
1049  unsigned int nodeid,
1050  unsigned int *interface_id,
1051  struct totem_ip_address *interfaces,
1052  unsigned int interfaces_size,
1053  char ***status,
1054  unsigned int *iface_count)
1055 {
1056  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1057  struct totem_ip_address *iface_ptr = interfaces;
1058  int res = 0;
1059  int i,n;
1060  int num_ifs = 0;
1061 
1062  memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1063  *iface_count = INTERFACE_MAX;
1064 
1065  for (i=0; i<INTERFACE_MAX; i++) {
1066  for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1067  if (instance->totem_config->interfaces[i].configured &&
1068  instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1069  memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1070  interface_id[num_ifs] = i;
1071  iface_ptr++;
1072  if (++num_ifs > interfaces_size) {
1073  res = -2;
1074  break;
1075  }
1076  }
1077  }
1078  }
1079 
1080  totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1081  *iface_count = num_ifs;
1082  return (res);
1083 }
1084 
1086  void *srp_context,
1087  const char *cipher_type,
1088  const char *hash_type)
1089 {
1090  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1091  int res;
1092 
1093  res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1094 
1095  return (res);
1096 }
1097 
1098 
1100  void *srp_context)
1101 {
1102  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1103  unsigned int res;
1104 
1105  res = instance->my_id.nodeid;
1106 
1107  return (res);
1108 }
1109 
1111  void *srp_context)
1112 {
1113  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114  int res;
1115 
1116  res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1117 
1118  return (res);
1119 }
1120 
1121 
1122 /*
1123  * Set operations for use by the membership algorithm
1124  */
1125 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1126 {
1127  if (a->nodeid == b->nodeid) {
1128  return 1;
1129  }
1130  return 0;
1131 }
1132 
1133 static void srp_addr_to_nodeid (
1134  struct totemsrp_instance *instance,
1135  unsigned int *nodeid_out,
1136  struct srp_addr *srp_addr_in,
1137  unsigned int entries)
1138 {
1139  unsigned int i;
1140 
1141  for (i = 0; i < entries; i++) {
1142  nodeid_out[i] = srp_addr_in[i].nodeid;
1143  }
1144 }
1145 
1146 static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1147 {
1148  struct srp_addr res;
1149 
1150  res.nodeid = swab32 (in.nodeid);
1151 
1152  return (res);
1153 }
1154 
1155 static void memb_consensus_reset (struct totemsrp_instance *instance)
1156 {
1157  instance->consensus_list_entries = 0;
1158 }
1159 
1160 static void memb_set_subtract (
1161  struct srp_addr *out_list, int *out_list_entries,
1162  struct srp_addr *one_list, int one_list_entries,
1163  struct srp_addr *two_list, int two_list_entries)
1164 {
1165  int found = 0;
1166  int i;
1167  int j;
1168 
1169  *out_list_entries = 0;
1170 
1171  for (i = 0; i < one_list_entries; i++) {
1172  for (j = 0; j < two_list_entries; j++) {
1173  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1174  found = 1;
1175  break;
1176  }
1177  }
1178  if (found == 0) {
1179  out_list[*out_list_entries] = one_list[i];
1180  *out_list_entries = *out_list_entries + 1;
1181  }
1182  found = 0;
1183  }
1184 }
1185 
1186 /*
1187  * Set consensus for a specific processor
1188  */
1189 static void memb_consensus_set (
1190  struct totemsrp_instance *instance,
1191  const struct srp_addr *addr)
1192 {
1193  int found = 0;
1194  int i;
1195 
1196  for (i = 0; i < instance->consensus_list_entries; i++) {
1197  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1198  found = 1;
1199  break; /* found entry */
1200  }
1201  }
1202  instance->consensus_list[i].addr = *addr;
1203  instance->consensus_list[i].set = 1;
1204  if (found == 0) {
1205  instance->consensus_list_entries++;
1206  }
1207  return;
1208 }
1209 
1210 /*
1211  * Is consensus set for a specific processor
1212  */
1213 static int memb_consensus_isset (
1214  struct totemsrp_instance *instance,
1215  const struct srp_addr *addr)
1216 {
1217  int i;
1218 
1219  for (i = 0; i < instance->consensus_list_entries; i++) {
1220  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1221  return (instance->consensus_list[i].set);
1222  }
1223  }
1224  return (0);
1225 }
1226 
1227 /*
1228  * Is consensus agreed upon based upon consensus database
1229  */
1230 static int memb_consensus_agreed (
1231  struct totemsrp_instance *instance)
1232 {
1233  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1234  int token_memb_entries = 0;
1235  int agreed = 1;
1236  int i;
1237 
1238  memb_set_subtract (token_memb, &token_memb_entries,
1239  instance->my_proc_list, instance->my_proc_list_entries,
1240  instance->my_failed_list, instance->my_failed_list_entries);
1241 
1242  for (i = 0; i < token_memb_entries; i++) {
1243  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1244  agreed = 0;
1245  break;
1246  }
1247  }
1248 
1249  if (agreed && instance->failed_to_recv == 1) {
1250  /*
1251  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1252  * will create single ring anyway.
1253  */
1254 
1255  return (agreed);
1256  }
1257 
1258  assert (token_memb_entries >= 1);
1259 
1260  return (agreed);
1261 }
1262 
1263 static void memb_consensus_notset (
1264  struct totemsrp_instance *instance,
1265  struct srp_addr *no_consensus_list,
1266  int *no_consensus_list_entries,
1267  struct srp_addr *comparison_list,
1268  int comparison_list_entries)
1269 {
1270  int i;
1271 
1272  *no_consensus_list_entries = 0;
1273 
1274  for (i = 0; i < instance->my_proc_list_entries; i++) {
1275  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1276  no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i];
1277  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1278  }
1279  }
1280 }
1281 
1282 /*
1283  * Is set1 equal to set2 Entries can be in different orders
1284  */
1285 static int memb_set_equal (
1286  struct srp_addr *set1, int set1_entries,
1287  struct srp_addr *set2, int set2_entries)
1288 {
1289  int i;
1290  int j;
1291 
1292  int found = 0;
1293 
1294  if (set1_entries != set2_entries) {
1295  return (0);
1296  }
1297  for (i = 0; i < set2_entries; i++) {
1298  for (j = 0; j < set1_entries; j++) {
1299  if (srp_addr_equal (&set1[j], &set2[i])) {
1300  found = 1;
1301  break;
1302  }
1303  }
1304  if (found == 0) {
1305  return (0);
1306  }
1307  found = 0;
1308  }
1309  return (1);
1310 }
1311 
1312 /*
1313  * Is subset fully contained in fullset
1314  */
1315 static int memb_set_subset (
1316  const struct srp_addr *subset, int subset_entries,
1317  const struct srp_addr *fullset, int fullset_entries)
1318 {
1319  int i;
1320  int j;
1321  int found = 0;
1322 
1323  if (subset_entries > fullset_entries) {
1324  return (0);
1325  }
1326  for (i = 0; i < subset_entries; i++) {
1327  for (j = 0; j < fullset_entries; j++) {
1328  if (srp_addr_equal (&subset[i], &fullset[j])) {
1329  found = 1;
1330  }
1331  }
1332  if (found == 0) {
1333  return (0);
1334  }
1335  found = 0;
1336  }
1337  return (1);
1338 }
1339 /*
1340  * merge subset into fullset taking care not to add duplicates
1341  */
1342 static void memb_set_merge (
1343  const struct srp_addr *subset, int subset_entries,
1344  struct srp_addr *fullset, int *fullset_entries)
1345 {
1346  int found = 0;
1347  int i;
1348  int j;
1349 
1350  for (i = 0; i < subset_entries; i++) {
1351  for (j = 0; j < *fullset_entries; j++) {
1352  if (srp_addr_equal (&fullset[j], &subset[i])) {
1353  found = 1;
1354  break;
1355  }
1356  }
1357  if (found == 0) {
1358  fullset[*fullset_entries] = subset[i];
1359  *fullset_entries = *fullset_entries + 1;
1360  }
1361  found = 0;
1362  }
1363  return;
1364 }
1365 
1366 static void memb_set_and_with_ring_id (
1367  struct srp_addr *set1,
1368  struct memb_ring_id *set1_ring_ids,
1369  int set1_entries,
1370  struct srp_addr *set2,
1371  int set2_entries,
1372  struct memb_ring_id *old_ring_id,
1373  struct srp_addr *and,
1374  int *and_entries)
1375 {
1376  int i;
1377  int j;
1378  int found = 0;
1379 
1380  *and_entries = 0;
1381 
1382  for (i = 0; i < set2_entries; i++) {
1383  for (j = 0; j < set1_entries; j++) {
1384  if (srp_addr_equal (&set1[j], &set2[i])) {
1385  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1386  found = 1;
1387  }
1388  break;
1389  }
1390  }
1391  if (found) {
1392  and[*and_entries] = set1[j];
1393  *and_entries = *and_entries + 1;
1394  }
1395  found = 0;
1396  }
1397  return;
1398 }
1399 
1400 static void memb_set_log(
1401  struct totemsrp_instance *instance,
1402  int level,
1403  const char *string,
1404  struct srp_addr *list,
1405  int list_entries)
1406 {
1407  char int_buf[32];
1408  char list_str[512];
1409  int i;
1410 
1411  memset(list_str, 0, sizeof(list_str));
1412 
1413  for (i = 0; i < list_entries; i++) {
1414  if (i == 0) {
1415  snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1416  } else {
1417  snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1418  }
1419 
1420  if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1421  break ;
1422  }
1423  strcat(list_str, int_buf);
1424  }
1425 
1426  log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1427 }
1428 
1429 static void my_leave_memb_clear(
1430  struct totemsrp_instance *instance)
1431 {
1432  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1433  instance->my_leave_memb_entries = 0;
1434 }
1435 
1436 static unsigned int my_leave_memb_match(
1437  struct totemsrp_instance *instance,
1438  unsigned int nodeid)
1439 {
1440  int i;
1441  unsigned int ret = 0;
1442 
1443  for (i = 0; i < instance->my_leave_memb_entries; i++){
1444  if (instance->my_leave_memb_list[i] == nodeid){
1445  ret = nodeid;
1446  break;
1447  }
1448  }
1449  return ret;
1450 }
1451 
1452 static void my_leave_memb_set(
1453  struct totemsrp_instance *instance,
1454  unsigned int nodeid)
1455 {
1456  int i, found = 0;
1457  for (i = 0; i < instance->my_leave_memb_entries; i++){
1458  if (instance->my_leave_memb_list[i] == nodeid){
1459  found = 1;
1460  break;
1461  }
1462  }
1463  if (found == 1) {
1464  return;
1465  }
1466  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1467  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1468  instance->my_leave_memb_entries++;
1469  } else {
1471  "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1472  }
1473 }
1474 
1475 
1476 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1477 {
1478  assert (instance != NULL);
1479  return totemnet_buffer_alloc (instance->totemnet_context);
1480 }
1481 
1482 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1483 {
1484  assert (instance != NULL);
1485  totemnet_buffer_release (instance->totemnet_context, ptr);
1486 }
1487 
1488 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1489 {
1490  int32_t res;
1491 
1492  qb_loop_timer_del (instance->totemsrp_poll_handle,
1494  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1495  QB_LOOP_MED,
1496  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1497  (void *)instance,
1498  timer_function_token_retransmit_timeout,
1499  &instance->timer_orf_token_retransmit_timeout);
1500  if (res != 0) {
1501  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1502  }
1503 
1504 }
1505 
1506 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1507 {
1508  int32_t res;
1509 
1510  if (instance->my_merge_detect_timeout_outstanding == 0) {
1511  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1512  QB_LOOP_MED,
1513  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1514  (void *)instance,
1515  timer_function_merge_detect_timeout,
1516  &instance->timer_merge_detect_timeout);
1517  if (res != 0) {
1518  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1519  }
1520 
1522  }
1523 }
1524 
1525 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1526 {
1527  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1529 }
1530 
1531 /*
1532  * ring_state_* is used to save and restore the sort queue
1533  * state when a recovery operation fails (and enters gather)
1534  */
1535 static void old_ring_state_save (struct totemsrp_instance *instance)
1536 {
1537  if (instance->old_ring_state_saved == 0) {
1538  instance->old_ring_state_saved = 1;
1539  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1540  sizeof (struct memb_ring_id));
1541  instance->old_ring_state_aru = instance->my_aru;
1544  "Saving state aru %x high seq received %x",
1545  instance->my_aru, instance->my_high_seq_received);
1546  }
1547 }
1548 
1549 static void old_ring_state_restore (struct totemsrp_instance *instance)
1550 {
1551  instance->my_aru = instance->old_ring_state_aru;
1554  "Restoring instance->my_aru %x my high seq received %x",
1555  instance->my_aru, instance->my_high_seq_received);
1556 }
1557 
1558 static void old_ring_state_reset (struct totemsrp_instance *instance)
1559 {
1561  "Resetting old ring state");
1562  instance->old_ring_state_saved = 0;
1563 }
1564 
1565 static void reset_pause_timeout (struct totemsrp_instance *instance)
1566 {
1567  int32_t res;
1568 
1569  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1570  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1571  QB_LOOP_MED,
1572  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1573  (void *)instance,
1574  timer_function_pause_timeout,
1575  &instance->timer_pause_timeout);
1576  if (res != 0) {
1577  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1578  }
1579 }
1580 
1581 static void reset_token_warning (struct totemsrp_instance *instance) {
1582  int32_t res;
1583 
1584  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1585  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1586  QB_LOOP_MED,
1587  instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1588  (void *)instance,
1589  timer_function_orf_token_warning,
1590  &instance->timer_orf_token_warning);
1591  if (res != 0) {
1592  log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1593  }
1594 }
1595 
1596 static void reset_token_timeout (struct totemsrp_instance *instance) {
1597  int32_t res;
1598 
1599  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1600  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1601  QB_LOOP_MED,
1602  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1603  (void *)instance,
1604  timer_function_orf_token_timeout,
1605  &instance->timer_orf_token_timeout);
1606  if (res != 0) {
1607  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1608  }
1609 
1610  if (instance->totem_config->token_warning)
1611  reset_token_warning(instance);
1612 }
1613 
1614 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1615  int32_t res;
1616 
1617  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1618  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1619  QB_LOOP_MED,
1620  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1621  (void *)instance,
1622  timer_function_heartbeat_timeout,
1623  &instance->timer_heartbeat_timeout);
1624  if (res != 0) {
1625  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1626  }
1627 }
1628 
1629 
1630 static void cancel_token_warning (struct totemsrp_instance *instance) {
1631  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1632 }
1633 
1634 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1635  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1636 
1637  if (instance->totem_config->token_warning)
1638  cancel_token_warning(instance);
1639 }
1640 
1641 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1642  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1643 }
1644 
1645 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1646 {
1647  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1648 }
1649 
1650 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1651 {
1652  int32_t res;
1653 
1654  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1655  QB_LOOP_MED,
1656  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1657  (void *)instance,
1658  timer_function_token_hold_retransmit_timeout,
1659  &instance->timer_orf_token_hold_retransmit_timeout);
1660  if (res != 0) {
1661  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1662  }
1663 }
1664 
1665 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1666 {
1667  qb_loop_timer_del (instance->totemsrp_poll_handle,
1669 }
1670 
1671 static void memb_state_consensus_timeout_expired (
1672  struct totemsrp_instance *instance)
1673 {
1674  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1675  int no_consensus_list_entries;
1676 
1677  instance->stats.consensus_timeouts++;
1678  if (memb_consensus_agreed (instance)) {
1679  memb_consensus_reset (instance);
1680 
1681  memb_consensus_set (instance, &instance->my_id);
1682 
1683  reset_token_timeout (instance); // REVIEWED
1684  } else {
1685  memb_consensus_notset (
1686  instance,
1687  no_consensus_list,
1688  &no_consensus_list_entries,
1689  instance->my_proc_list,
1690  instance->my_proc_list_entries);
1691 
1692  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1693  instance->my_failed_list, &instance->my_failed_list_entries);
1694  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1695  }
1696 }
1697 
1698 static void memb_join_message_send (struct totemsrp_instance *instance);
1699 
1700 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1701 
1702 /*
1703  * Timers used for various states of the membership algorithm
1704  */
1705 static void timer_function_pause_timeout (void *data)
1706 {
1707  struct totemsrp_instance *instance = data;
1708 
1709  instance->pause_timestamp = qb_util_nano_current_get ();
1710  reset_pause_timeout (instance);
1711 }
1712 
1713 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1714 {
1715  old_ring_state_restore (instance);
1716  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1717  instance->stats.recovery_token_lost++;
1718 }
1719 
1720 static void timer_function_orf_token_warning (void *data)
1721 {
1722  struct totemsrp_instance *instance = data;
1723  uint64_t tv_diff;
1724 
1725  /* need to protect against the case where token_warning is set to 0 dynamically */
1726  if (instance->totem_config->token_warning) {
1727  tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1728  instance->stats.token[instance->stats.latest_token].rx;
1730  "Token has not been received in %d ms ", (unsigned int) tv_diff);
1731  reset_token_warning(instance);
1732  } else {
1733  cancel_token_warning(instance);
1734  }
1735 }
1736 
1737 static void timer_function_orf_token_timeout (void *data)
1738 {
1739  struct totemsrp_instance *instance = data;
1740 
1741  switch (instance->memb_state) {
1744  "The token was lost in the OPERATIONAL state.");
1746  "A processor failed, forming new configuration.");
1748  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1749  instance->stats.operational_token_lost++;
1750  break;
1751 
1752  case MEMB_STATE_GATHER:
1754  "The consensus timeout expired.");
1755  memb_state_consensus_timeout_expired (instance);
1756  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1757  instance->stats.gather_token_lost++;
1758  break;
1759 
1760  case MEMB_STATE_COMMIT:
1762  "The token was lost in the COMMIT state.");
1763  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1764  instance->stats.commit_token_lost++;
1765  break;
1766 
1767  case MEMB_STATE_RECOVERY:
1769  "The token was lost in the RECOVERY state.");
1770  memb_recovery_state_token_loss (instance);
1771  instance->orf_token_discard = 1;
1772  break;
1773  }
1774 }
1775 
1776 static void timer_function_heartbeat_timeout (void *data)
1777 {
1778  struct totemsrp_instance *instance = data;
1780  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1781  timer_function_orf_token_timeout(data);
1782 }
1783 
1784 static void memb_timer_function_state_gather (void *data)
1785 {
1786  struct totemsrp_instance *instance = data;
1787  int32_t res;
1788 
1789  switch (instance->memb_state) {
1791  case MEMB_STATE_RECOVERY:
1792  assert (0); /* this should never happen */
1793  break;
1794  case MEMB_STATE_GATHER:
1795  case MEMB_STATE_COMMIT:
1796  memb_join_message_send (instance);
1797 
1798  /*
1799  * Restart the join timeout
1800  `*/
1801  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1802 
1803  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1804  QB_LOOP_MED,
1805  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1806  (void *)instance,
1807  memb_timer_function_state_gather,
1808  &instance->memb_timer_state_gather_join_timeout);
1809 
1810  if (res != 0) {
1811  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1812  }
1813  break;
1814  }
1815 }
1816 
1817 static void memb_timer_function_gather_consensus_timeout (void *data)
1818 {
1819  struct totemsrp_instance *instance = data;
1820  memb_state_consensus_timeout_expired (instance);
1821 }
1822 
1823 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1824 {
1825  unsigned int i;
1826  struct sort_queue_item *recovery_message_item;
1827  struct sort_queue_item regular_message_item;
1828  unsigned int range = 0;
1829  int res;
1830  void *ptr;
1831  struct mcast *mcast;
1832 
1834  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1835 
1836  range = instance->my_aru - SEQNO_START_MSG;
1837  /*
1838  * Move messages from recovery to regular sort queue
1839  */
1840 // todo should i be initialized to 0 or 1 ?
1841  for (i = 1; i <= range; i++) {
1842  res = sq_item_get (&instance->recovery_sort_queue,
1843  i + SEQNO_START_MSG, &ptr);
1844  if (res != 0) {
1845  continue;
1846  }
1847  recovery_message_item = ptr;
1848 
1849  /*
1850  * Convert recovery message into regular message
1851  */
1852  mcast = recovery_message_item->mcast;
1854  /*
1855  * Message is a recovery message encapsulated
1856  * in a new ring message
1857  */
1858  regular_message_item.mcast =
1859  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1860  regular_message_item.msg_len =
1861  recovery_message_item->msg_len - sizeof (struct mcast);
1862  mcast = regular_message_item.mcast;
1863  } else {
1864  /*
1865  * TODO this case shouldn't happen
1866  */
1867  continue;
1868  }
1869 
1871  "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1872  (uint64_t)mcast->seq);
1873 
1874  /*
1875  * Only add this message to the regular sort
1876  * queue if it was originated with the same ring
1877  * id as the previous ring
1878  */
1879  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1880  sizeof (struct memb_ring_id)) == 0) {
1881 
1882  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1883  if (res == 0) {
1884  sq_item_add (&instance->regular_sort_queue,
1885  &regular_message_item, mcast->seq);
1886  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1888  }
1889  }
1890  } else {
1892  "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1893  }
1894  }
1895 }
1896 
1897 /*
1898  * Change states in the state machine of the membership algorithm
1899  */
1900 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1901 {
1902  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1903  int joined_list_entries = 0;
1904  unsigned int aru_save;
1905  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1906  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1907  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1908  unsigned int left_list[PROCESSOR_COUNT_MAX];
1909  unsigned int i;
1910  unsigned int res;
1911  char left_node_msg[1024];
1912  char joined_node_msg[1024];
1913  char failed_node_msg[1024];
1914 
1915  instance->originated_orf_token = 0;
1916 
1917  memb_consensus_reset (instance);
1918 
1919  old_ring_state_reset (instance);
1920 
1921  deliver_messages_from_recovery_to_regular (instance);
1922 
1924  "Delivering to app %x to %x",
1925  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1926 
1927  aru_save = instance->my_aru;
1928  instance->my_aru = instance->old_ring_state_aru;
1929 
1930  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1931 
1932  /*
1933  * Calculate joined and left list
1934  */
1935  memb_set_subtract (instance->my_left_memb_list,
1936  &instance->my_left_memb_entries,
1937  instance->my_memb_list, instance->my_memb_entries,
1938  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1939 
1940  memb_set_subtract (joined_list, &joined_list_entries,
1941  instance->my_new_memb_list, instance->my_new_memb_entries,
1942  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1943 
1944  /*
1945  * Install new membership
1946  */
1947  instance->my_memb_entries = instance->my_new_memb_entries;
1948  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1949  sizeof (struct srp_addr) * instance->my_memb_entries);
1950  instance->last_released = 0;
1951  instance->my_set_retrans_flg = 0;
1952 
1953  /*
1954  * Deliver transitional configuration to application
1955  */
1956  srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1957  instance->my_left_memb_entries);
1958  srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1959  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1961  trans_memb_list_totemip, instance->my_trans_memb_entries,
1962  left_list, instance->my_left_memb_entries,
1963  0, 0, &instance->my_ring_id);
1964  instance->waiting_trans_ack = 1;
1965  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1966 
1967 // TODO we need to filter to ensure we only deliver those
1968 // messages which are part of instance->my_deliver_memb
1969  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1970 
1971  instance->my_aru = aru_save;
1972 
1973  /*
1974  * Deliver regular configuration to application
1975  */
1976  srp_addr_to_nodeid (instance, new_memb_list_totemip,
1977  instance->my_new_memb_list, instance->my_new_memb_entries);
1978  srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1979  joined_list_entries);
1981  new_memb_list_totemip, instance->my_new_memb_entries,
1982  0, 0,
1983  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1984 
1985  /*
1986  * The recovery sort queue now becomes the regular
1987  * sort queue. It is necessary to copy the state
1988  * into the regular sort queue.
1989  */
1990  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1991  instance->my_last_aru = SEQNO_START_MSG;
1992 
1993  /* When making my_proc_list smaller, ensure that the
1994  * now non-used entries are zero-ed out. There are some suspect
1995  * assert's that assume that there is always 2 entries in the list.
1996  * These fail when my_proc_list is reduced to 1 entry (and the
1997  * valid [0] entry is the same as the 'unused' [1] entry).
1998  */
1999  memset(instance->my_proc_list, 0,
2000  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2001 
2002  instance->my_proc_list_entries = instance->my_new_memb_entries;
2003  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2004  sizeof (struct srp_addr) * instance->my_memb_entries);
2005 
2006  instance->my_failed_list_entries = 0;
2007  /*
2008  * TODO Not exactly to spec
2009  *
2010  * At the entry to this function all messages without a gap are
2011  * deliered.
2012  *
2013  * This code throw away messages from the last gap in the sort queue
2014  * to my_high_seq_received
2015  *
2016  * What should really happen is we should deliver all messages up to
2017  * a gap, then delier the transitional configuration, then deliver
2018  * the messages between the first gap and my_high_seq_received, then
2019  * deliver a regular configuration, then deliver the regular
2020  * configuration
2021  *
2022  * Unfortunately totempg doesn't appear to like this operating mode
2023  * which needs more inspection
2024  */
2025  i = instance->my_high_seq_received + 1;
2026  do {
2027  void *ptr;
2028 
2029  i -= 1;
2030  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2031  if (i == 0) {
2032  break;
2033  }
2034  } while (res);
2035 
2036  instance->my_high_delivered = i;
2037 
2038  for (i = 0; i <= instance->my_high_delivered; i++) {
2039  void *ptr;
2040 
2041  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2042  if (res == 0) {
2043  struct sort_queue_item *regular_message;
2044 
2045  regular_message = ptr;
2046  free (regular_message->mcast);
2047  }
2048  }
2049  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2050  instance->last_released = instance->my_high_delivered;
2051 
2052  if (joined_list_entries) {
2053  int sptr = 0;
2054  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2055  for (i=0; i< joined_list_entries; i++) {
2056  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2057  }
2058  }
2059  else {
2060  joined_node_msg[0] = '\0';
2061  }
2062 
2063  if (instance->my_left_memb_entries) {
2064  int sptr = 0;
2065  int sptr2 = 0;
2066  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2067  for (i=0; i< instance->my_left_memb_entries; i++) {
2068  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2069  }
2070  for (i=0; i< instance->my_left_memb_entries; i++) {
2071  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2072  if (sptr2 == 0) {
2073  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2074  }
2075  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2076  }
2077  }
2078  if (sptr2 == 0) {
2079  failed_node_msg[0] = '\0';
2080  }
2081  }
2082  else {
2083  left_node_msg[0] = '\0';
2084  failed_node_msg[0] = '\0';
2085  }
2086 
2087  my_leave_memb_clear(instance);
2088 
2090  "entering OPERATIONAL state.");
2092  "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2093  instance->my_ring_id.rep,
2094  (uint64_t)instance->my_ring_id.seq,
2095  joined_node_msg,
2096  left_node_msg);
2097 
2098  if (strlen(failed_node_msg)) {
2100  "Failed to receive the leave message.%s",
2101  failed_node_msg);
2102  }
2103 
2104  instance->memb_state = MEMB_STATE_OPERATIONAL;
2105 
2106  instance->stats.operational_entered++;
2107  instance->stats.continuous_gather = 0;
2108 
2109  instance->my_received_flg = 1;
2110 
2111  reset_pause_timeout (instance);
2112 
2113  /*
2114  * Save ring id information from this configuration to determine
2115  * which processors are transitioning from old regular configuration
2116  * in to new regular configuration on the next configuration change
2117  */
2118  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2119  sizeof (struct memb_ring_id));
2120 
2121  return;
2122 }
2123 
2124 static void memb_state_gather_enter (
2125  struct totemsrp_instance *instance,
2126  enum gather_state_from gather_from)
2127 {
2128  int32_t res;
2129 
2130  instance->orf_token_discard = 1;
2131 
2132  instance->originated_orf_token = 0;
2133 
2134  memb_set_merge (
2135  &instance->my_id, 1,
2136  instance->my_proc_list, &instance->my_proc_list_entries);
2137 
2138  memb_join_message_send (instance);
2139 
2140  /*
2141  * Restart the join timeout
2142  */
2143  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2144 
2145  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2146  QB_LOOP_MED,
2147  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2148  (void *)instance,
2149  memb_timer_function_state_gather,
2150  &instance->memb_timer_state_gather_join_timeout);
2151  if (res != 0) {
2152  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2153  }
2154 
2155  /*
2156  * Restart the consensus timeout
2157  */
2158  qb_loop_timer_del (instance->totemsrp_poll_handle,
2160 
2161  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2162  QB_LOOP_MED,
2163  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2164  (void *)instance,
2165  memb_timer_function_gather_consensus_timeout,
2166  &instance->memb_timer_state_gather_consensus_timeout);
2167  if (res != 0) {
2168  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2169  }
2170 
2171  /*
2172  * Cancel the token loss and token retransmission timeouts
2173  */
2174  cancel_token_retransmit_timeout (instance); // REVIEWED
2175  cancel_token_timeout (instance); // REVIEWED
2176  cancel_merge_detect_timeout (instance);
2177 
2178  memb_consensus_reset (instance);
2179 
2180  memb_consensus_set (instance, &instance->my_id);
2181 
2183  "entering GATHER state from %d(%s).",
2184  gather_from, gsfrom_to_msg(gather_from));
2185 
2186  instance->memb_state = MEMB_STATE_GATHER;
2187  instance->stats.gather_entered++;
2188 
2189  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2190  /*
2191  * State 3 means gather, so we are continuously gathering.
2192  */
2193  instance->stats.continuous_gather++;
2194  }
2195 
2196  return;
2197 }
2198 
2199 static void timer_function_token_retransmit_timeout (void *data);
2200 
2201 static void target_set_completed (
2202  void *context)
2203 {
2204  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2205 
2206  memb_state_commit_token_send (instance);
2207 
2208 }
2209 
2210 static void memb_state_commit_enter (
2211  struct totemsrp_instance *instance)
2212 {
2213  old_ring_state_save (instance);
2214 
2215  memb_state_commit_token_update (instance);
2216 
2217  memb_state_commit_token_target_set (instance);
2218 
2219  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2220 
2222 
2223  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2224 
2226 
2227  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2228 
2229  instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2230 
2231  instance->token_ring_id_seq = instance->my_ring_id.seq;
2232 
2234  "entering COMMIT state.");
2235 
2236  instance->memb_state = MEMB_STATE_COMMIT;
2237  reset_token_retransmit_timeout (instance); // REVIEWED
2238  reset_token_timeout (instance); // REVIEWED
2239 
2240  instance->stats.commit_entered++;
2241  instance->stats.continuous_gather = 0;
2242 
2243  /*
2244  * reset all flow control variables since we are starting a new ring
2245  */
2246  instance->my_trc = 0;
2247  instance->my_pbl = 0;
2248  instance->my_cbl = 0;
2249  /*
2250  * commit token sent after callback that token target has been set
2251  */
2252 }
2253 
2254 static void memb_state_recovery_enter (
2255  struct totemsrp_instance *instance,
2257 {
2258  int i;
2259  int local_received_flg = 1;
2260  unsigned int low_ring_aru;
2261  unsigned int range = 0;
2262  unsigned int messages_originated = 0;
2263  const struct srp_addr *addr;
2264  struct memb_commit_token_memb_entry *memb_list;
2265  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2266 
2267  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2268  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2269 
2271  "entering RECOVERY state.");
2272 
2273  instance->orf_token_discard = 0;
2274 
2275  instance->my_high_ring_delivered = 0;
2276 
2277  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2278  cs_queue_reinit (&instance->retrans_message_queue);
2279 
2280  low_ring_aru = instance->old_ring_state_high_seq_received;
2281 
2282  memb_state_commit_token_send_recovery (instance, commit_token);
2283 
2284  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2285 
2286  /*
2287  * Build regular configuration
2288  */
2290  instance->totemnet_context,
2291  commit_token->addr_entries);
2292 
2293  /*
2294  * Build transitional configuration
2295  */
2296  for (i = 0; i < instance->my_new_memb_entries; i++) {
2297  memcpy (&my_new_memb_ring_id_list[i],
2298  &memb_list[i].ring_id,
2299  sizeof (struct memb_ring_id));
2300  }
2301  memb_set_and_with_ring_id (
2302  instance->my_new_memb_list,
2303  my_new_memb_ring_id_list,
2304  instance->my_new_memb_entries,
2305  instance->my_memb_list,
2306  instance->my_memb_entries,
2307  &instance->my_old_ring_id,
2308  instance->my_trans_memb_list,
2309  &instance->my_trans_memb_entries);
2310 
2311  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2313  "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2314  }
2315  for (i = 0; i < instance->my_new_memb_entries; i++) {
2317  "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2319  "previous ringid (" CS_PRI_RING_ID ")",
2320  memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2321 
2323  "aru %x high delivered %x received flag %d",
2324  memb_list[i].aru,
2325  memb_list[i].high_delivered,
2326  memb_list[i].received_flg);
2327 
2328  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2329  }
2330  /*
2331  * Determine if any received flag is false
2332  */
2333  for (i = 0; i < commit_token->addr_entries; i++) {
2334  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2335  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2336 
2337  memb_list[i].received_flg == 0) {
2338  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2339  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2340  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2341  local_received_flg = 0;
2342  break;
2343  }
2344  }
2345  if (local_received_flg == 1) {
2346  goto no_originate;
2347  } /* Else originate messages if we should */
2348 
2349  /*
2350  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2351  */
2352  for (i = 0; i < commit_token->addr_entries; i++) {
2353  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2354  instance->my_deliver_memb_list,
2355  instance->my_deliver_memb_entries) &&
2356 
2357  memcmp (&instance->my_old_ring_id,
2358  &memb_list[i].ring_id,
2359  sizeof (struct memb_ring_id)) == 0) {
2360 
2361  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2362 
2363  low_ring_aru = memb_list[i].aru;
2364  }
2365  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2366  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2367  }
2368  }
2369  }
2370 
2371  /*
2372  * Copy all old ring messages to instance->retrans_message_queue
2373  */
2374  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2375  if (range == 0) {
2376  /*
2377  * No messages to copy
2378  */
2379  goto no_originate;
2380  }
2381  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2382 
2384  "copying all old ring messages from %x-%x.",
2385  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2386 
2387  for (i = 1; i <= range; i++) {
2389  struct message_item message_item;
2390  void *ptr;
2391  int res;
2392 
2393  res = sq_item_get (&instance->regular_sort_queue,
2394  low_ring_aru + i, &ptr);
2395  if (res != 0) {
2396  continue;
2397  }
2398  sort_queue_item = ptr;
2399  messages_originated++;
2400  memset (&message_item, 0, sizeof (struct message_item));
2401  // TODO LEAK
2402  message_item.mcast = totemsrp_buffer_alloc (instance);
2403  assert (message_item.mcast);
2404  memset(message_item.mcast, 0, sizeof (struct mcast));
2408  message_item.mcast->system_from = instance->my_id;
2410 
2411  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2412  assert (message_item.mcast->header.nodeid);
2413  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2414  sizeof (struct memb_ring_id));
2415  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2416  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2419  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2420  }
2422  "Originated %d messages in RECOVERY.", messages_originated);
2423  goto originated;
2424 
2425 no_originate:
2427  "Did not need to originate any messages in recovery.");
2428 
2429 originated:
2430  instance->my_aru = SEQNO_START_MSG;
2431  instance->my_aru_count = 0;
2432  instance->my_seq_unchanged = 0;
2434  instance->my_install_seq = SEQNO_START_MSG;
2435  instance->last_released = SEQNO_START_MSG;
2436 
2437  reset_token_timeout (instance); // REVIEWED
2438  reset_token_retransmit_timeout (instance); // REVIEWED
2439 
2440  instance->memb_state = MEMB_STATE_RECOVERY;
2441  instance->stats.recovery_entered++;
2442  instance->stats.continuous_gather = 0;
2443 
2444  return;
2445 }
2446 
2447 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2448 {
2449  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2450 
2451  token_hold_cancel_send (instance);
2452 
2453  return;
2454 }
2455 
2457  void *srp_context,
2458  struct iovec *iovec,
2459  unsigned int iov_len,
2460  int guarantee)
2461 {
2462  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2463  int i;
2464  struct message_item message_item;
2465  char *addr;
2466  unsigned int addr_idx;
2467  struct cs_queue *queue_use;
2468 
2469  if (instance->waiting_trans_ack) {
2470  queue_use = &instance->new_message_queue_trans;
2471  } else {
2472  queue_use = &instance->new_message_queue;
2473  }
2474 
2475  if (cs_queue_is_full (queue_use)) {
2476  log_printf (instance->totemsrp_log_level_debug, "queue full");
2477  return (-1);
2478  }
2479 
2480  memset (&message_item, 0, sizeof (struct message_item));
2481 
2482  /*
2483  * Allocate pending item
2484  */
2485  message_item.mcast = totemsrp_buffer_alloc (instance);
2486  if (message_item.mcast == 0) {
2487  goto error_mcast;
2488  }
2489 
2490  /*
2491  * Set mcast header
2492  */
2493  memset(message_item.mcast, 0, sizeof (struct mcast));
2498 
2499  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2500  assert (message_item.mcast->header.nodeid);
2501 
2503  message_item.mcast->system_from = instance->my_id;
2504 
2505  addr = (char *)message_item.mcast;
2506  addr_idx = sizeof (struct mcast);
2507  for (i = 0; i < iov_len; i++) {
2508  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2509  addr_idx += iovec[i].iov_len;
2510  }
2511 
2512  message_item.msg_len = addr_idx;
2513 
2514  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2515  instance->stats.mcast_tx++;
2516  cs_queue_item_add (queue_use, &message_item);
2517 
2518  return (0);
2519 
2520 error_mcast:
2521  return (-1);
2522 }
2523 
2524 /*
2525  * Determine if there is room to queue a new message
2526  */
2527 int totemsrp_avail (void *srp_context)
2528 {
2529  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2530  int avail;
2531  struct cs_queue *queue_use;
2532 
2533  if (instance->waiting_trans_ack) {
2534  queue_use = &instance->new_message_queue_trans;
2535  } else {
2536  queue_use = &instance->new_message_queue;
2537  }
2538  cs_queue_avail (queue_use, &avail);
2539 
2540  return (avail);
2541 }
2542 
2543 /*
2544  * ORF Token Management
2545  */
2546 /*
2547  * Recast message to mcast group if it is available
2548  */
2549 static int orf_token_remcast (
2550  struct totemsrp_instance *instance,
2551  int seq)
2552 {
2554  int res;
2555  void *ptr;
2556 
2557  struct sq *sort_queue;
2558 
2559  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2560  sort_queue = &instance->recovery_sort_queue;
2561  } else {
2562  sort_queue = &instance->regular_sort_queue;
2563  }
2564 
2565  res = sq_in_range (sort_queue, seq);
2566  if (res == 0) {
2567  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2568  return (-1);
2569  }
2570 
2571  /*
2572  * Get RTR item at seq, if not available, return
2573  */
2574  res = sq_item_get (sort_queue, seq, &ptr);
2575  if (res != 0) {
2576  return -1;
2577  }
2578 
2579  sort_queue_item = ptr;
2580 
2582  instance->totemnet_context,
2585 
2586  return (0);
2587 }
2588 
2589 
2590 /*
2591  * Free all freeable messages from ring
2592  */
2593 static void messages_free (
2594  struct totemsrp_instance *instance,
2595  unsigned int token_aru)
2596 {
2597  struct sort_queue_item *regular_message;
2598  unsigned int i;
2599  int res;
2600  int log_release = 0;
2601  unsigned int release_to;
2602  unsigned int range = 0;
2603 
2604  release_to = token_aru;
2605  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2606  release_to = instance->my_last_aru;
2607  }
2608  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2609  release_to = instance->my_high_delivered;
2610  }
2611 
2612  /*
2613  * Ensure we dont try release before an already released point
2614  */
2615  if (sq_lt_compare (release_to, instance->last_released)) {
2616  return;
2617  }
2618 
2619  range = release_to - instance->last_released;
2620  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2621 
2622  /*
2623  * Release retransmit list items if group aru indicates they are transmitted
2624  */
2625  for (i = 1; i <= range; i++) {
2626  void *ptr;
2627 
2628  res = sq_item_get (&instance->regular_sort_queue,
2629  instance->last_released + i, &ptr);
2630  if (res == 0) {
2631  regular_message = ptr;
2632  totemsrp_buffer_release (instance, regular_message->mcast);
2633  }
2634  sq_items_release (&instance->regular_sort_queue,
2635  instance->last_released + i);
2636 
2637  log_release = 1;
2638  }
2639  instance->last_released += range;
2640 
2641  if (log_release) {
2643  "releasing messages up to and including %x", release_to);
2644  }
2645 }
2646 
2647 static void update_aru (
2648  struct totemsrp_instance *instance)
2649 {
2650  unsigned int i;
2651  int res;
2652  struct sq *sort_queue;
2653  unsigned int range;
2654  unsigned int my_aru_saved = 0;
2655 
2656  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2657  sort_queue = &instance->recovery_sort_queue;
2658  } else {
2659  sort_queue = &instance->regular_sort_queue;
2660  }
2661 
2662  range = instance->my_high_seq_received - instance->my_aru;
2663 
2664  my_aru_saved = instance->my_aru;
2665  for (i = 1; i <= range; i++) {
2666 
2667  void *ptr;
2668 
2669  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2670  /*
2671  * If hole, stop updating aru
2672  */
2673  if (res != 0) {
2674  break;
2675  }
2676  }
2677  instance->my_aru += i - 1;
2678 }
2679 
2680 /*
2681  * Multicasts pending messages onto the ring (requires orf_token possession)
2682  */
2683 static int orf_token_mcast (
2684  struct totemsrp_instance *instance,
2685  struct orf_token *token,
2686  int fcc_mcasts_allowed)
2687 {
2688  struct message_item *message_item = 0;
2689  struct cs_queue *mcast_queue;
2690  struct sq *sort_queue;
2692  struct mcast *mcast;
2693  unsigned int fcc_mcast_current;
2694 
2695  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2696  mcast_queue = &instance->retrans_message_queue;
2697  sort_queue = &instance->recovery_sort_queue;
2698  reset_token_retransmit_timeout (instance); // REVIEWED
2699  } else {
2700  if (instance->waiting_trans_ack) {
2701  mcast_queue = &instance->new_message_queue_trans;
2702  } else {
2703  mcast_queue = &instance->new_message_queue;
2704  }
2705 
2706  sort_queue = &instance->regular_sort_queue;
2707  }
2708 
2709  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2710  if (cs_queue_is_empty (mcast_queue)) {
2711  break;
2712  }
2713  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2714 
2715  message_item->mcast->seq = ++token->seq;
2716  message_item->mcast->this_seqno = instance->global_seqno++;
2717 
2718  /*
2719  * Build IO vector
2720  */
2721  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2724 
2726 
2727  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2728 
2729  /*
2730  * Add message to retransmit queue
2731  */
2732  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2733 
2735  instance->totemnet_context,
2738 
2739  /*
2740  * Delete item from pending queue
2741  */
2742  cs_queue_item_remove (mcast_queue);
2743 
2744  /*
2745  * If messages mcasted, deliver any new messages to totempg
2746  */
2747  instance->my_high_seq_received = token->seq;
2748  }
2749 
2750  update_aru (instance);
2751 
2752  /*
2753  * Return 1 if more messages are available for single node clusters
2754  */
2755  return (fcc_mcast_current);
2756 }
2757 
2758 /*
2759  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2760  * Modify's orf_token's rtr to include retransmits required by this process
2761  */
2762 static int orf_token_rtr (
2763  struct totemsrp_instance *instance,
2764  struct orf_token *orf_token,
2765  unsigned int *fcc_allowed)
2766 {
2767  unsigned int res;
2768  unsigned int i, j;
2769  unsigned int found;
2770  struct sq *sort_queue;
2771  struct rtr_item *rtr_list;
2772  unsigned int range = 0;
2773  char retransmit_msg[1024];
2774  char value[64];
2775 
2776  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2777  sort_queue = &instance->recovery_sort_queue;
2778  } else {
2779  sort_queue = &instance->regular_sort_queue;
2780  }
2781 
2782  rtr_list = &orf_token->rtr_list[0];
2783 
2784  strcpy (retransmit_msg, "Retransmit List: ");
2785  if (orf_token->rtr_list_entries) {
2787  "Retransmit List %d", orf_token->rtr_list_entries);
2788  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2789  sprintf (value, "%x ", rtr_list[i].seq);
2790  strcat (retransmit_msg, value);
2791  }
2792  strcat (retransmit_msg, "");
2794  "%s", retransmit_msg);
2795  }
2796 
2797  /*
2798  * Retransmit messages on orf_token's RTR list from RTR queue
2799  */
2800  for (instance->fcc_remcast_current = 0, i = 0;
2801  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2802 
2803  /*
2804  * If this retransmit request isn't from this configuration,
2805  * try next rtr entry
2806  */
2807  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2808  sizeof (struct memb_ring_id)) != 0) {
2809 
2810  i += 1;
2811  continue;
2812  }
2813 
2814  res = orf_token_remcast (instance, rtr_list[i].seq);
2815  if (res == 0) {
2816  /*
2817  * Multicasted message, so no need to copy to new retransmit list
2818  */
2820  assert (orf_token->rtr_list_entries >= 0);
2821  memmove (&rtr_list[i], &rtr_list[i + 1],
2822  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2823 
2824  instance->stats.mcast_retx++;
2825  instance->fcc_remcast_current++;
2826  } else {
2827  i += 1;
2828  }
2829  }
2830  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2831 
2832  /*
2833  * Add messages to retransmit to RTR list
2834  * but only retry if there is room in the retransmit list
2835  */
2836 
2837  range = orf_token->seq - instance->my_aru;
2838  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2839 
2841  (i <= range); i++) {
2842 
2843  /*
2844  * Ensure message is within the sort queue range
2845  */
2846  res = sq_in_range (sort_queue, instance->my_aru + i);
2847  if (res == 0) {
2848  break;
2849  }
2850 
2851  /*
2852  * Find if a message is missing from this processor
2853  */
2854  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2855  if (res == 0) {
2856  /*
2857  * Determine how many times we have missed receiving
2858  * this sequence number. sq_item_miss_count increments
2859  * a counter for the sequence number. The miss count
2860  * will be returned and compared. This allows time for
2861  * delayed multicast messages to be received before
2862  * declaring the message is missing and requesting a
2863  * retransmit.
2864  */
2865  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2866  if (res < instance->totem_config->miss_count_const) {
2867  continue;
2868  }
2869 
2870  /*
2871  * Determine if missing message is already in retransmit list
2872  */
2873  found = 0;
2874  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2875  if (instance->my_aru + i == rtr_list[j].seq) {
2876  found = 1;
2877  }
2878  }
2879  if (found == 0) {
2880  /*
2881  * Missing message not found in current retransmit list so add it
2882  */
2884  &instance->my_ring_id, sizeof (struct memb_ring_id));
2885  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2887  }
2888  }
2889  }
2890  return (instance->fcc_remcast_current);
2891 }
2892 
2893 static void token_retransmit (struct totemsrp_instance *instance)
2894 {
2896  instance->orf_token_retransmit,
2897  instance->orf_token_retransmit_size);
2898 }
2899 
2900 /*
2901  * Retransmit the regular token if no mcast or token has
2902  * been received in retransmit token period retransmit
2903  * the token to the next processor
2904  */
2905 static void timer_function_token_retransmit_timeout (void *data)
2906 {
2907  struct totemsrp_instance *instance = data;
2908 
2909  switch (instance->memb_state) {
2910  case MEMB_STATE_GATHER:
2911  break;
2912  case MEMB_STATE_COMMIT:
2914  case MEMB_STATE_RECOVERY:
2915  token_retransmit (instance);
2916  reset_token_retransmit_timeout (instance); // REVIEWED
2917  break;
2918  }
2919 }
2920 
2921 static void timer_function_token_hold_retransmit_timeout (void *data)
2922 {
2923  struct totemsrp_instance *instance = data;
2924 
2925  switch (instance->memb_state) {
2926  case MEMB_STATE_GATHER:
2927  break;
2928  case MEMB_STATE_COMMIT:
2929  break;
2931  case MEMB_STATE_RECOVERY:
2932  token_retransmit (instance);
2933  break;
2934  }
2935 }
2936 
2937 static void timer_function_merge_detect_timeout(void *data)
2938 {
2939  struct totemsrp_instance *instance = data;
2940 
2942 
2943  switch (instance->memb_state) {
2945  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2946  memb_merge_detect_transmit (instance);
2947  }
2948  break;
2949  case MEMB_STATE_GATHER:
2950  case MEMB_STATE_COMMIT:
2951  case MEMB_STATE_RECOVERY:
2952  break;
2953  }
2954 }
2955 
2956 /*
2957  * Send orf_token to next member (requires orf_token)
2958  */
2959 static int token_send (
2960  struct totemsrp_instance *instance,
2961  struct orf_token *orf_token,
2962  int forward_token)
2963 {
2964  int res = 0;
2965  unsigned int orf_token_size;
2966 
2967  orf_token_size = sizeof (struct orf_token) +
2968  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2969 
2970  orf_token->header.nodeid = instance->my_id.nodeid;
2971  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2972  instance->orf_token_retransmit_size = orf_token_size;
2973  assert (orf_token->header.nodeid);
2974 
2975  if (forward_token == 0) {
2976  return (0);
2977  }
2978 
2980  orf_token,
2981  orf_token_size);
2982 
2983  return (res);
2984 }
2985 
2986 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2987 {
2989 
2990  /*
2991  * Only cancel if the token is currently held
2992  */
2993  if (instance->my_token_held == 0) {
2994  return (0);
2995  }
2996  instance->my_token_held = 0;
2997 
2998  /*
2999  * Build message
3000  */
3006  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3007  sizeof (struct memb_ring_id));
3008  assert (token_hold_cancel.header.nodeid);
3009 
3010  instance->stats.token_hold_cancel_tx++;
3011 
3013  sizeof (struct token_hold_cancel));
3014 
3015  return (0);
3016 }
3017 
3018 static int orf_token_send_initial (struct totemsrp_instance *instance)
3019 {
3020  struct orf_token orf_token;
3021  int res;
3022 
3027  orf_token.header.nodeid = instance->my_id.nodeid;
3028  assert (orf_token.header.nodeid);
3031  orf_token.retrans_flg = 1;
3032  instance->my_set_retrans_flg = 1;
3033  instance->stats.orf_token_tx++;
3034 
3035  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3036  orf_token.retrans_flg = 0;
3037  instance->my_set_retrans_flg = 0;
3038  } else {
3039  orf_token.retrans_flg = 1;
3040  instance->my_set_retrans_flg = 1;
3041  }
3042 
3043  orf_token.aru = 0;
3045  orf_token.aru_addr = instance->my_id.nodeid;
3046 
3047  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3048  orf_token.fcc = 0;
3049  orf_token.backlog = 0;
3050 
3052 
3053  res = token_send (instance, &orf_token, 1);
3054 
3055  return (res);
3056 }
3057 
3058 static void memb_state_commit_token_update (
3059  struct totemsrp_instance *instance)
3060 {
3061  struct srp_addr *addr;
3062  struct memb_commit_token_memb_entry *memb_list;
3063  unsigned int high_aru;
3064  unsigned int i;
3065 
3066  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3067  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3068 
3069  memcpy (instance->my_new_memb_list, addr,
3070  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3071 
3072  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3073 
3074  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3075  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3076 
3077  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3078  /*
3079  * TODO high delivered is really instance->my_aru, but with safe this
3080  * could change?
3081  */
3082  instance->my_received_flg =
3083  (instance->my_aru == instance->my_high_seq_received);
3084 
3085  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3086 
3087  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3088  /*
3089  * find high aru up to current memb_index for all matching ring ids
3090  * if any ring id matching memb_index has aru less then high aru set
3091  * received flag for that entry to false
3092  */
3093  high_aru = memb_list[instance->commit_token->memb_index].aru;
3094  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3095  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3096  &memb_list[i].ring_id,
3097  sizeof (struct memb_ring_id)) == 0) {
3098 
3099  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3100  high_aru = memb_list[i].aru;
3101  }
3102  }
3103  }
3104 
3105  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3106  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3107  &memb_list[i].ring_id,
3108  sizeof (struct memb_ring_id)) == 0) {
3109 
3110  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3111  memb_list[i].received_flg = 0;
3112  if (i == instance->commit_token->memb_index) {
3113  instance->my_received_flg = 0;
3114  }
3115  }
3116  }
3117  }
3118 
3119  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3120  instance->commit_token->memb_index += 1;
3121  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3122  assert (instance->commit_token->header.nodeid);
3123 }
3124 
3125 static void memb_state_commit_token_target_set (
3126  struct totemsrp_instance *instance)
3127 {
3128  struct srp_addr *addr;
3129 
3130  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3131 
3132  /* Totemnet just looks at the node id */
3134  instance->totemnet_context,
3135  addr[instance->commit_token->memb_index %
3136  instance->commit_token->addr_entries].nodeid);
3137 }
3138 
3139 static int memb_state_commit_token_send_recovery (
3140  struct totemsrp_instance *instance,
3141  struct memb_commit_token *commit_token)
3142 {
3143  unsigned int commit_token_size;
3144 
3145  commit_token->token_seq++;
3146  commit_token->header.nodeid = instance->my_id.nodeid;
3147  commit_token_size = sizeof (struct memb_commit_token) +
3148  ((sizeof (struct srp_addr) +
3149  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3150  /*
3151  * Make a copy for retransmission if necessary
3152  */
3153  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3154  instance->orf_token_retransmit_size = commit_token_size;
3155 
3156  instance->stats.memb_commit_token_tx++;
3157 
3159  commit_token,
3160  commit_token_size);
3161 
3162  /*
3163  * Request retransmission of the commit token in case it is lost
3164  */
3165  reset_token_retransmit_timeout (instance);
3166  return (0);
3167 }
3168 
3169 static int memb_state_commit_token_send (
3170  struct totemsrp_instance *instance)
3171 {
3172  unsigned int commit_token_size;
3173 
3174  instance->commit_token->token_seq++;
3175  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3176  commit_token_size = sizeof (struct memb_commit_token) +
3177  ((sizeof (struct srp_addr) +
3178  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3179  /*
3180  * Make a copy for retransmission if necessary
3181  */
3182  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3183  instance->orf_token_retransmit_size = commit_token_size;
3184 
3185  instance->stats.memb_commit_token_tx++;
3186 
3188  instance->commit_token,
3189  commit_token_size);
3190 
3191  /*
3192  * Request retransmission of the commit token in case it is lost
3193  */
3194  reset_token_retransmit_timeout (instance);
3195  return (0);
3196 }
3197 
3198 
3199 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3200 {
3201  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3202  int token_memb_entries = 0;
3203  int i;
3204  unsigned int lowest_nodeid;
3205 
3206  memb_set_subtract (token_memb, &token_memb_entries,
3207  instance->my_proc_list, instance->my_proc_list_entries,
3208  instance->my_failed_list, instance->my_failed_list_entries);
3209 
3210  /*
3211  * find representative by searching for smallest identifier
3212  */
3213  assert(token_memb_entries > 0);
3214 
3215  lowest_nodeid = token_memb[0].nodeid;
3216  for (i = 1; i < token_memb_entries; i++) {
3217  if (lowest_nodeid > token_memb[i].nodeid) {
3218  lowest_nodeid = token_memb[i].nodeid;
3219  }
3220  }
3221  return (lowest_nodeid == instance->my_id.nodeid);
3222 }
3223 
3224 static int srp_addr_compare (const void *a, const void *b)
3225 {
3226  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3227  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3228 
3229  if (srp_a->nodeid < srp_b->nodeid) {
3230  return -1;
3231  } else if (srp_a->nodeid > srp_b->nodeid) {
3232  return 1;
3233  } else {
3234  return 0;
3235  }
3236 }
3237 
3238 static void memb_state_commit_token_create (
3239  struct totemsrp_instance *instance)
3240 {
3241  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3242  struct srp_addr *addr;
3243  struct memb_commit_token_memb_entry *memb_list;
3244  int token_memb_entries = 0;
3245 
3247  "Creating commit token because I am the rep.");
3248 
3249  memb_set_subtract (token_memb, &token_memb_entries,
3250  instance->my_proc_list, instance->my_proc_list_entries,
3251  instance->my_failed_list, instance->my_failed_list_entries);
3252 
3253  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3254  instance->commit_token->header.magic = TOTEM_MH_MAGIC;
3257  instance->commit_token->header.encapsulated = 0;
3258  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3259  assert (instance->commit_token->header.nodeid);
3260 
3261  instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3262  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3263 
3264  /*
3265  * This qsort is necessary to ensure the commit token traverses
3266  * the ring in the proper order
3267  */
3268  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3269  srp_addr_compare);
3270 
3271  instance->commit_token->memb_index = 0;
3272  instance->commit_token->addr_entries = token_memb_entries;
3273 
3274  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3275  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3276 
3277  memcpy (addr, token_memb,
3278  token_memb_entries * sizeof (struct srp_addr));
3279  memset (memb_list, 0,
3280  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3281 }
3282 
3283 static void memb_join_message_send (struct totemsrp_instance *instance)
3284 {
3285  char memb_join_data[40000];
3286  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3287  char *addr;
3288  unsigned int addr_idx;
3289  size_t msg_len;
3290 
3295  memb_join->header.nodeid = instance->my_id.nodeid;
3296  assert (memb_join->header.nodeid);
3297 
3298  msg_len = sizeof(struct memb_join) +
3299  ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3300 
3301  if (msg_len > sizeof(memb_join_data)) {
3303  "memb_join_message too long. Ignoring message.");
3304 
3305  return ;
3306  }
3307 
3308  memb_join->ring_seq = instance->my_ring_id.seq;
3311  memb_join->system_from = instance->my_id;
3312 
3313  /*
3314  * This mess adds the joined and failed processor lists into the join
3315  * message
3316  */
3317  addr = (char *)memb_join;
3318  addr_idx = sizeof (struct memb_join);
3319  memcpy (&addr[addr_idx],
3320  instance->my_proc_list,
3321  instance->my_proc_list_entries *
3322  sizeof (struct srp_addr));
3323  addr_idx +=
3324  instance->my_proc_list_entries *
3325  sizeof (struct srp_addr);
3326  memcpy (&addr[addr_idx],
3327  instance->my_failed_list,
3328  instance->my_failed_list_entries *
3329  sizeof (struct srp_addr));
3330  addr_idx +=
3331  instance->my_failed_list_entries *
3332  sizeof (struct srp_addr);
3333 
3334  if (instance->totem_config->send_join_timeout) {
3335  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3336  }
3337 
3338  instance->stats.memb_join_tx++;
3339 
3341  instance->totemnet_context,
3342  memb_join,
3343  addr_idx);
3344 }
3345 
3346 static void memb_leave_message_send (struct totemsrp_instance *instance)
3347 {
3348  char memb_join_data[40000];
3349  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3350  char *addr;
3351  unsigned int addr_idx;
3352  int active_memb_entries;
3353  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3354  size_t msg_len;
3355 
3357  "sending join/leave message");
3358 
3359  /*
3360  * add us to the failed list, and remove us from
3361  * the members list
3362  */
3363  memb_set_merge(
3364  &instance->my_id, 1,
3365  instance->my_failed_list, &instance->my_failed_list_entries);
3366 
3367  memb_set_subtract (active_memb, &active_memb_entries,
3368  instance->my_proc_list, instance->my_proc_list_entries,
3369  &instance->my_id, 1);
3370 
3371  msg_len = sizeof(struct memb_join) +
3372  ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3373 
3374  if (msg_len > sizeof(memb_join_data)) {
3376  "memb_leave message too long. Ignoring message.");
3377 
3378  return ;
3379  }
3380 
3386 
3387  memb_join->ring_seq = instance->my_ring_id.seq;
3388  memb_join->proc_list_entries = active_memb_entries;
3390  memb_join->system_from = instance->my_id;
3391 
3392  // TODO: CC Maybe use the actual join send routine.
3393  /*
3394  * This mess adds the joined and failed processor lists into the join
3395  * message
3396  */
3397  addr = (char *)memb_join;
3398  addr_idx = sizeof (struct memb_join);
3399  memcpy (&addr[addr_idx],
3400  active_memb,
3401  active_memb_entries *
3402  sizeof (struct srp_addr));
3403  addr_idx +=
3404  active_memb_entries *
3405  sizeof (struct srp_addr);
3406  memcpy (&addr[addr_idx],
3407  instance->my_failed_list,
3408  instance->my_failed_list_entries *
3409  sizeof (struct srp_addr));
3410  addr_idx +=
3411  instance->my_failed_list_entries *
3412  sizeof (struct srp_addr);
3413 
3414 
3415  if (instance->totem_config->send_join_timeout) {
3416  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3417  }
3418  instance->stats.memb_join_tx++;
3419 
3421  instance->totemnet_context,
3422  memb_join,
3423  addr_idx);
3424 }
3425 
3426 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3427 {
3429 
3435  memb_merge_detect.system_from = instance->my_id;
3436  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3437  sizeof (struct memb_ring_id));
3438  assert (memb_merge_detect.header.nodeid);
3439 
3440  instance->stats.memb_merge_detect_tx++;
3443  sizeof (struct memb_merge_detect));
3444 }
3445 
3446 static void memb_ring_id_set (
3447  struct totemsrp_instance *instance,
3448  const struct memb_ring_id *ring_id)
3449 {
3450 
3451  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3452 }
3453 
3455  void *srp_context,
3456  void **handle_out,
3458  int delete,
3459  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3460  const void *data)
3461 {
3462  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3463  struct token_callback_instance *callback_handle;
3464 
3465  token_hold_cancel_send (instance);
3466 
3467  callback_handle = malloc (sizeof (struct token_callback_instance));
3468  if (callback_handle == 0) {
3469  return (-1);
3470  }
3471  *handle_out = (void *)callback_handle;
3472  qb_list_init (&callback_handle->list);
3473  callback_handle->callback_fn = callback_fn;
3474  callback_handle->data = (void *) data;
3475  callback_handle->callback_type = type;
3476  callback_handle->delete = delete;
3477  switch (type) {
3479  qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3480  break;
3482  qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3483  break;
3484  }
3485 
3486  return (0);
3487 }
3488 
3489 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3490 {
3491  struct token_callback_instance *h;
3492 
3493  if (*handle_out) {
3494  h = (struct token_callback_instance *)*handle_out;
3495  qb_list_del (&h->list);
3496  free (h);
3497  h = NULL;
3498  *handle_out = 0;
3499  }
3500 }
3501 
3502 static void token_callbacks_execute (
3503  struct totemsrp_instance *instance,
3505 {
3506  struct qb_list_head *list, *tmp_iter;
3507  struct qb_list_head *callback_listhead = 0;
3509  int res;
3510  int del;
3511 
3512  switch (type) {
3514  callback_listhead = &instance->token_callback_received_listhead;
3515  break;
3517  callback_listhead = &instance->token_callback_sent_listhead;
3518  break;
3519  default:
3520  assert (0);
3521  }
3522 
3523  qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3524  token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3526  if (del == 1) {
3527  qb_list_del (list);
3528  }
3529 
3533  /*
3534  * This callback failed to execute, try it again on the next token
3535  */
3536  if (res == -1 && del == 1) {
3537  qb_list_add (list, callback_listhead);
3538  } else if (del) {
3539  free (token_callback_instance);
3540  }
3541  }
3542 }
3543 
3544 /*
3545  * Flow control functions
3546  */
3547 static unsigned int backlog_get (struct totemsrp_instance *instance)
3548 {
3549  unsigned int backlog = 0;
3550  struct cs_queue *queue_use = NULL;
3551 
3552  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3553  if (instance->waiting_trans_ack) {
3554  queue_use = &instance->new_message_queue_trans;
3555  } else {
3556  queue_use = &instance->new_message_queue;
3557  }
3558  } else
3559  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3560  queue_use = &instance->retrans_message_queue;
3561  }
3562 
3563  if (queue_use != NULL) {
3564  backlog = cs_queue_used (queue_use);
3565  }
3566 
3567  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3568  return (backlog);
3569 }
3570 
3571 static int fcc_calculate (
3572  struct totemsrp_instance *instance,
3573  struct orf_token *token)
3574 {
3575  unsigned int transmits_allowed;
3576  unsigned int backlog_calc;
3577 
3578  transmits_allowed = instance->totem_config->max_messages;
3579 
3580  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3581  transmits_allowed = instance->totem_config->window_size - token->fcc;
3582  }
3583 
3584  instance->my_cbl = backlog_get (instance);
3585 
3586  /*
3587  * Only do backlog calculation if there is a backlog otherwise
3588  * we would result in div by zero
3589  */
3590  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3591  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3592  (token->backlog + instance->my_cbl - instance->my_pbl);
3593  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3594  transmits_allowed = backlog_calc;
3595  }
3596  }
3597 
3598  return (transmits_allowed);
3599 }
3600 
3601 /*
3602  * don't overflow the RTR sort queue
3603  */
3604 static void fcc_rtr_limit (
3605  struct totemsrp_instance *instance,
3606  struct orf_token *token,
3607  unsigned int *transmits_allowed)
3608 {
3609  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3610  check -= (*transmits_allowed + instance->totem_config->window_size);
3611  assert (check >= 0);
3612  if (sq_lt_compare (instance->last_released +
3613  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3614  instance->totem_config->window_size,
3615 
3616  token->seq)) {
3617 
3618  *transmits_allowed = 0;
3619  }
3620 }
3621 
3622 static void fcc_token_update (
3623  struct totemsrp_instance *instance,
3624  struct orf_token *token,
3625  unsigned int msgs_transmitted)
3626 {
3627  token->fcc += msgs_transmitted - instance->my_trc;
3628  token->backlog += instance->my_cbl - instance->my_pbl;
3629  instance->my_trc = msgs_transmitted;
3630  instance->my_pbl = instance->my_cbl;
3631 }
3632 
3633 /*
3634  * Sanity checkers
3635  */
3636 static int check_orf_token_sanity(
3637  const struct totemsrp_instance *instance,
3638  const void *msg,
3639  size_t msg_len,
3640  int endian_conversion_needed)
3641 {
3642  int rtr_entries;
3643  const struct orf_token *token = (const struct orf_token *)msg;
3644  size_t required_len;
3645 
3646  if (msg_len < sizeof(struct orf_token)) {
3648  "Received orf_token message is too short... ignoring.");
3649 
3650  return (-1);
3651  }
3652 
3653  if (endian_conversion_needed) {
3654  rtr_entries = swab32(token->rtr_list_entries);
3655  } else {
3656  rtr_entries = token->rtr_list_entries;
3657  }
3658 
3659  required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3660  if (msg_len < required_len) {
3662  "Received orf_token message is too short... ignoring.");
3663 
3664  return (-1);
3665  }
3666 
3667  return (0);
3668 }
3669 
3670 static int check_mcast_sanity(
3671  struct totemsrp_instance *instance,
3672  const void *msg,
3673  size_t msg_len,
3674  int endian_conversion_needed)
3675 {
3676 
3677  if (msg_len < sizeof(struct mcast)) {
3679  "Received mcast message is too short... ignoring.");
3680 
3681  return (-1);
3682  }
3683 
3684  return (0);
3685 }
3686 
3687 static int check_memb_merge_detect_sanity(
3688  struct totemsrp_instance *instance,
3689  const void *msg,
3690  size_t msg_len,
3691  int endian_conversion_needed)
3692 {
3693 
3694  if (msg_len < sizeof(struct memb_merge_detect)) {
3696  "Received memb_merge_detect message is too short... ignoring.");
3697 
3698  return (-1);
3699  }
3700 
3701  return (0);
3702 }
3703 
3704 static int check_memb_join_sanity(
3705  struct totemsrp_instance *instance,
3706  const void *msg,
3707  size_t msg_len,
3708  int endian_conversion_needed)
3709 {
3710  const struct memb_join *mj_msg = (const struct memb_join *)msg;
3711  unsigned int proc_list_entries;
3712  unsigned int failed_list_entries;
3713  size_t required_len;
3714 
3715  if (msg_len < sizeof(struct memb_join)) {
3717  "Received memb_join message is too short... ignoring.");
3718 
3719  return (-1);
3720  }
3721 
3724 
3725  if (endian_conversion_needed) {
3728  }
3729 
3730  required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3731  if (msg_len < required_len) {
3733  "Received memb_join message is too short... ignoring.");
3734 
3735  return (-1);
3736  }
3737 
3738  return (0);
3739 }
3740 
3741 static int check_memb_commit_token_sanity(
3742  struct totemsrp_instance *instance,
3743  const void *msg,
3744  size_t msg_len,
3745  int endian_conversion_needed)
3746 {
3747  const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3748  unsigned int addr_entries;
3749  size_t required_len;
3750 
3751  if (msg_len < sizeof(struct memb_commit_token)) {
3753  "Received memb_commit_token message is too short... ignoring.");
3754 
3755  return (0);
3756  }
3757 
3758  addr_entries= mct_msg->addr_entries;
3759  if (endian_conversion_needed) {
3761  }
3762 
3763  required_len = sizeof(struct memb_commit_token) +
3764  (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3765  if (msg_len < required_len) {
3767  "Received memb_commit_token message is too short... ignoring.");
3768 
3769  return (-1);
3770  }
3771 
3772  return (0);
3773 }
3774 
3775 static int check_token_hold_cancel_sanity(
3776  struct totemsrp_instance *instance,
3777  const void *msg,
3778  size_t msg_len,
3779  int endian_conversion_needed)
3780 {
3781 
3782  if (msg_len < sizeof(struct token_hold_cancel)) {
3784  "Received token_hold_cancel message is too short... ignoring.");
3785 
3786  return (-1);
3787  }
3788 
3789  return (0);
3790 }
3791 
3792 /*
3793  * Message Handlers
3794  */
3795 
3796 unsigned long long int tv_old;
3797 /*
3798  * message handler called when TOKEN message type received
3799  */
3800 static int message_handler_orf_token (
3801  struct totemsrp_instance *instance,
3802  const void *msg,
3803  size_t msg_len,
3804  int endian_conversion_needed)
3805 {
3806  char token_storage[1500];
3807  char token_convert[1500];
3808  struct orf_token *token = NULL;
3809  int forward_token;
3810  unsigned int transmits_allowed;
3811  unsigned int mcasted_retransmit;
3812  unsigned int mcasted_regular;
3813  unsigned int last_aru;
3814 
3815 #ifdef GIVEINFO
3816  unsigned long long tv_current;
3817  unsigned long long tv_diff;
3818 
3819  tv_current = qb_util_nano_current_get ();
3820  tv_diff = tv_current - tv_old;
3821  tv_old = tv_current;
3822 
3824  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3825 #endif
3826 
3827  if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3828  return (0);
3829  }
3830 
3831  if (instance->orf_token_discard) {
3832  return (0);
3833  }
3834 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3835  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3836  return (0);
3837  }
3838 #endif
3839 
3840  if (endian_conversion_needed) {
3841  orf_token_endian_convert ((struct orf_token *)msg,
3842  (struct orf_token *)token_convert);
3843  msg = (struct orf_token *)token_convert;
3844  }
3845 
3846  /*
3847  * Make copy of token and retransmit list in case we have
3848  * to flush incoming messages from the kernel queue
3849  */
3850  token = (struct orf_token *)token_storage;
3851  memcpy (token, msg, sizeof (struct orf_token));
3852  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3853  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3854 
3855 
3856  /*
3857  * Handle merge detection timeout
3858  */
3859  if (token->seq == instance->my_last_seq) {
3860  start_merge_detect_timeout (instance);
3861  instance->my_seq_unchanged += 1;
3862  } else {
3863  cancel_merge_detect_timeout (instance);
3864  cancel_token_hold_retransmit_timeout (instance);
3865  instance->my_seq_unchanged = 0;
3866  }
3867 
3868  instance->my_last_seq = token->seq;
3869 
3870 #ifdef TEST_RECOVERY_MSG_COUNT
3871  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3872  return (0);
3873  }
3874 #endif
3875  instance->flushing = 1;
3877  instance->flushing = 0;
3878 
3879  /*
3880  * Determine if we should hold (in reality drop) the token
3881  */
3882  instance->my_token_held = 0;
3883  if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3884  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3885  instance->my_token_held = 1;
3886  } else {
3887  if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3888  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3889  instance->my_token_held = 1;
3890  }
3891  }
3892 
3893  /*
3894  * Hold onto token when there is no activity on ring and
3895  * this processor is the ring rep
3896  */
3897  forward_token = 1;
3898  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3899  if (instance->my_token_held) {
3900  forward_token = 0;
3901  }
3902  }
3903 
3904  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3905 
3906  switch (instance->memb_state) {
3907  case MEMB_STATE_COMMIT:
3908  /* Discard token */
3909  break;
3910 
3912  messages_free (instance, token->aru);
3913  /*
3914  * Do NOT add break, this case should also execute code in gather case.
3915  */
3916 
3917  case MEMB_STATE_GATHER:
3918  /*
3919  * DO NOT add break, we use different free mechanism in recovery state
3920  */
3921 
3922  case MEMB_STATE_RECOVERY:
3923  /*
3924  * Discard tokens from another configuration
3925  */
3926  if (memcmp (&token->ring_id, &instance->my_ring_id,
3927  sizeof (struct memb_ring_id)) != 0) {
3928 
3929  if ((forward_token)
3930  && instance->use_heartbeat) {
3931  reset_heartbeat_timeout(instance);
3932  }
3933  else {
3934  cancel_heartbeat_timeout(instance);
3935  }
3936 
3937  return (0); /* discard token */
3938  }
3939 
3940  /*
3941  * Discard retransmitted tokens
3942  */
3943  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3944  return (0); /* discard token */
3945  }
3946  last_aru = instance->my_last_aru;
3947  instance->my_last_aru = token->aru;
3948 
3949  transmits_allowed = fcc_calculate (instance, token);
3950  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3951 
3952  if (instance->my_token_held == 1 &&
3953  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3954  instance->my_token_held = 0;
3955  forward_token = 1;
3956  }
3957 
3958  fcc_rtr_limit (instance, token, &transmits_allowed);
3959  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3960 /*
3961 if (mcasted_regular) {
3962 printf ("mcasted regular %d\n", mcasted_regular);
3963 printf ("token seq %d\n", token->seq);
3964 }
3965 */
3966  fcc_token_update (instance, token, mcasted_retransmit +
3967  mcasted_regular);
3968 
3969  if (sq_lt_compare (instance->my_aru, token->aru) ||
3970  instance->my_id.nodeid == token->aru_addr ||
3971  token->aru_addr == 0) {
3972 
3973  token->aru = instance->my_aru;
3974  if (token->aru == token->seq) {
3975  token->aru_addr = 0;
3976  } else {
3977  token->aru_addr = instance->my_id.nodeid;
3978  }
3979  }
3980  if (token->aru == last_aru && token->aru_addr != 0) {
3981  instance->my_aru_count += 1;
3982  } else {
3983  instance->my_aru_count = 0;
3984  }
3985 
3986  /*
3987  * We really don't follow specification there. In specification, OTHER nodes
3988  * detect failure of one node (based on aru_count) and my_id IS NEVER added
3989  * to failed list (so node never mark itself as failed)
3990  */
3991  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3992  token->aru_addr == instance->my_id.nodeid) {
3993 
3995  "FAILED TO RECEIVE");
3996 
3997  instance->failed_to_recv = 1;
3998 
3999  memb_set_merge (&instance->my_id, 1,
4000  instance->my_failed_list,
4001  &instance->my_failed_list_entries);
4002 
4003  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4004  } else {
4005  instance->my_token_seq = token->token_seq;
4006  token->token_seq += 1;
4007 
4008  if (instance->memb_state == MEMB_STATE_RECOVERY) {
4009  /*
4010  * instance->my_aru == instance->my_high_seq_received means this processor
4011  * has recovered all messages it can recover
4012  * (ie: its retrans queue is empty)
4013  */
4014  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4015 
4016  if (token->retrans_flg == 0) {
4017  token->retrans_flg = 1;
4018  instance->my_set_retrans_flg = 1;
4019  }
4020  } else
4021  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4022  token->retrans_flg = 0;
4023  instance->my_set_retrans_flg = 0;
4024  }
4026  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4027  token->retrans_flg, instance->my_set_retrans_flg,
4028  cs_queue_is_empty (&instance->retrans_message_queue),
4029  instance->my_retrans_flg_count, token->aru);
4030  if (token->retrans_flg == 0) {
4031  instance->my_retrans_flg_count += 1;
4032  } else {
4033  instance->my_retrans_flg_count = 0;
4034  }
4035  if (instance->my_retrans_flg_count == 2) {
4036  instance->my_install_seq = token->seq;
4037  }
4039  "install seq %x aru %x high seq received %x",
4040  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4041  if (instance->my_retrans_flg_count >= 2 &&
4042  instance->my_received_flg == 0 &&
4043  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4044  instance->my_received_flg = 1;
4045  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4046  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4047  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4048  }
4049  if (instance->my_retrans_flg_count >= 3 &&
4050  sq_lte_compare (instance->my_install_seq, token->aru)) {
4051  instance->my_rotation_counter += 1;
4052  } else {
4053  instance->my_rotation_counter = 0;
4054  }
4055  if (instance->my_rotation_counter == 2) {
4057  "retrans flag count %x token aru %x install seq %x aru %x %x",
4058  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4059  instance->my_aru, token->seq);
4060 
4061  memb_state_operational_enter (instance);
4062  instance->my_rotation_counter = 0;
4063  instance->my_retrans_flg_count = 0;
4064  }
4065  }
4066 
4068  token_send (instance, token, forward_token);
4069 
4070 #ifdef GIVEINFO
4071  tv_current = qb_util_nano_current_get ();
4072  tv_diff = tv_current - tv_old;
4073  tv_old = tv_current;
4075  "I held %0.4f ms",
4076  ((float)tv_diff) / 1000000.0);
4077 #endif
4078  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4079  messages_deliver_to_app (instance, 0,
4080  instance->my_high_seq_received);
4081  }
4082 
4083  /*
4084  * Deliver messages after token has been transmitted
4085  * to improve performance
4086  */
4087  reset_token_timeout (instance); // REVIEWED
4088  reset_token_retransmit_timeout (instance); // REVIEWED
4089  if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4090  instance->my_token_held == 1) {
4091 
4092  start_token_hold_retransmit_timeout (instance);
4093  }
4094 
4095  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4096  }
4097  break;
4098  }
4099 
4100  if ((forward_token)
4101  && instance->use_heartbeat) {
4102  reset_heartbeat_timeout(instance);
4103  }
4104  else {
4105  cancel_heartbeat_timeout(instance);
4106  }
4107 
4108  return (0);
4109 }
4110 
4111 static void messages_deliver_to_app (
4112  struct totemsrp_instance *instance,
4113  int skip,
4114  unsigned int end_point)
4115 {
4116  struct sort_queue_item *sort_queue_item_p;
4117  unsigned int i;
4118  int res;
4119  struct mcast *mcast_in;
4120  struct mcast mcast_header;
4121  unsigned int range = 0;
4122  int endian_conversion_required;
4123  unsigned int my_high_delivered_stored = 0;
4124  struct srp_addr aligned_system_from;
4125 
4126  range = end_point - instance->my_high_delivered;
4127 
4128  if (range) {
4130  "Delivering %x to %x", instance->my_high_delivered,
4131  end_point);
4132  }
4133  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4134  my_high_delivered_stored = instance->my_high_delivered;
4135 
4136  /*
4137  * Deliver messages in order from rtr queue to pending delivery queue
4138  */
4139  for (i = 1; i <= range; i++) {
4140 
4141  void *ptr = 0;
4142 
4143  /*
4144  * If out of range of sort queue, stop assembly
4145  */
4146  res = sq_in_range (&instance->regular_sort_queue,
4147  my_high_delivered_stored + i);
4148  if (res == 0) {
4149  break;
4150  }
4151 
4152  res = sq_item_get (&instance->regular_sort_queue,
4153  my_high_delivered_stored + i, &ptr);
4154  /*
4155  * If hole, stop assembly
4156  */
4157  if (res != 0 && skip == 0) {
4158  break;
4159  }
4160 
4161  instance->my_high_delivered = my_high_delivered_stored + i;
4162 
4163  if (res != 0) {
4164  continue;
4165 
4166  }
4167 
4168  sort_queue_item_p = ptr;
4169 
4170  mcast_in = sort_queue_item_p->mcast;
4171  assert (mcast_in != (struct mcast *)0xdeadbeef);
4172 
4173  endian_conversion_required = 0;
4174  if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4175  endian_conversion_required = 1;
4176  mcast_endian_convert (mcast_in, &mcast_header);
4177  } else {
4178  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4179  }
4180 
4181  aligned_system_from = mcast_header.system_from;
4182 
4183  /*
4184  * Skip messages not originated in instance->my_deliver_memb
4185  */
4186  if (skip &&
4187  memb_set_subset (&aligned_system_from,
4188  1,
4189  instance->my_deliver_memb_list,
4190  instance->my_deliver_memb_entries) == 0) {
4191 
4192  instance->my_high_delivered = my_high_delivered_stored + i;
4193 
4194  continue;
4195  }
4196 
4197  /*
4198  * Message found
4199  */
4201  "Delivering MCAST message with seq %x to pending delivery queue",
4202  mcast_header.seq);
4203 
4204  /*
4205  * Message is locally originated multicast
4206  */
4207  instance->totemsrp_deliver_fn (
4208  mcast_header.header.nodeid,
4209  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4210  sort_queue_item_p->msg_len - sizeof (struct mcast),
4211  endian_conversion_required);
4212  }
4213 }
4214 
4215 /*
4216  * recv message handler called when MCAST message type received
4217  */
4218 static int message_handler_mcast (
4219  struct totemsrp_instance *instance,
4220  const void *msg,
4221  size_t msg_len,
4222  int endian_conversion_needed)
4223 {
4225  struct sq *sort_queue;
4226  struct mcast mcast_header;
4227  struct srp_addr aligned_system_from;
4228 
4229  if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4230  return (0);
4231  }
4232 
4233  if (endian_conversion_needed) {
4234  mcast_endian_convert (msg, &mcast_header);
4235  } else {
4236  memcpy (&mcast_header, msg, sizeof (struct mcast));
4237  }
4238 
4239  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4240  sort_queue = &instance->recovery_sort_queue;
4241  } else {
4242  sort_queue = &instance->regular_sort_queue;
4243  }
4244 
4245  assert (msg_len <= FRAME_SIZE_MAX);
4246 
4247 #ifdef TEST_DROP_MCAST_PERCENTAGE
4248  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4249  return (0);
4250  }
4251 #endif
4252 
4253  /*
4254  * If the message is foreign execute the switch below
4255  */
4256  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4257  sizeof (struct memb_ring_id)) != 0) {
4258 
4259  aligned_system_from = mcast_header.system_from;
4260 
4261  switch (instance->memb_state) {
4263  memb_set_merge (
4264  &aligned_system_from, 1,
4265  instance->my_proc_list, &instance->my_proc_list_entries);
4266  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4267  break;
4268 
4269  case MEMB_STATE_GATHER:
4270  if (!memb_set_subset (
4271  &aligned_system_from,
4272  1,
4273  instance->my_proc_list,
4274  instance->my_proc_list_entries)) {
4275 
4276  memb_set_merge (&aligned_system_from, 1,
4277  instance->my_proc_list, &instance->my_proc_list_entries);
4278  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4279  return (0);
4280  }
4281  break;
4282 
4283  case MEMB_STATE_COMMIT:
4284  /* discard message */
4285  instance->stats.rx_msg_dropped++;
4286  break;
4287 
4288  case MEMB_STATE_RECOVERY:
4289  /* discard message */
4290  instance->stats.rx_msg_dropped++;
4291  break;
4292  }
4293  return (0);
4294  }
4295 
4297  "Received ringid (" CS_PRI_RING_ID ") seq %x",
4298  mcast_header.ring_id.rep,
4299  (uint64_t)mcast_header.ring_id.seq,
4300  mcast_header.seq);
4301 
4302  /*
4303  * Add mcast message to rtr queue if not already in rtr queue
4304  * otherwise free io vectors
4305  */
4306  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4307  sq_in_range (sort_queue, mcast_header.seq) &&
4308  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4309 
4310  /*
4311  * Allocate new multicast memory block
4312  */
4313 // TODO LEAK
4314  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4315  if (sort_queue_item.mcast == NULL) {
4316  return (-1); /* error here is corrected by the algorithm */
4317  }
4318  memcpy (sort_queue_item.mcast, msg, msg_len);
4319  sort_queue_item.msg_len = msg_len;
4320 
4321  if (sq_lt_compare (instance->my_high_seq_received,
4322  mcast_header.seq)) {
4323  instance->my_high_seq_received = mcast_header.seq;
4324  }
4325 
4326  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4327  }
4328 
4329  update_aru (instance);
4330  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4331  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4332  }
4333 
4334 /* TODO remove from retrans message queue for old ring in recovery state */
4335  return (0);
4336 }
4337 
4338 static int message_handler_memb_merge_detect (
4339  struct totemsrp_instance *instance,
4340  const void *msg,
4341  size_t msg_len,
4342  int endian_conversion_needed)
4343 {
4345  struct srp_addr aligned_system_from;
4346 
4347  if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4348  return (0);
4349  }
4350 
4351  if (endian_conversion_needed) {
4352  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4353  } else {
4354  memcpy (&memb_merge_detect, msg,
4355  sizeof (struct memb_merge_detect));
4356  }
4357 
4358  /*
4359  * do nothing if this is a merge detect from this configuration
4360  */
4361  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4362  sizeof (struct memb_ring_id)) == 0) {
4363 
4364  return (0);
4365  }
4366 
4367  aligned_system_from = memb_merge_detect.system_from;
4368 
4369  /*
4370  * Execute merge operation
4371  */
4372  switch (instance->memb_state) {
4374  memb_set_merge (&aligned_system_from, 1,
4375  instance->my_proc_list, &instance->my_proc_list_entries);
4376  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4377  break;
4378 
4379  case MEMB_STATE_GATHER:
4380  if (!memb_set_subset (
4381  &aligned_system_from,
4382  1,
4383  instance->my_proc_list,
4384  instance->my_proc_list_entries)) {
4385 
4386  memb_set_merge (&aligned_system_from, 1,
4387  instance->my_proc_list, &instance->my_proc_list_entries);
4388  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4389  return (0);
4390  }
4391  break;
4392 
4393  case MEMB_STATE_COMMIT:
4394  /* do nothing in commit */
4395  break;
4396 
4397  case MEMB_STATE_RECOVERY:
4398  /* do nothing in recovery */
4399  break;
4400  }
4401  return (0);
4402 }
4403 
4404 static void memb_join_process (
4405  struct totemsrp_instance *instance,
4406  const struct memb_join *memb_join)
4407 {
4408  struct srp_addr *proc_list;
4409  struct srp_addr *failed_list;
4410  int gather_entered = 0;
4411  int fail_minus_memb_entries = 0;
4412  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4413  struct srp_addr aligned_system_from;
4414 
4415  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4416  failed_list = proc_list + memb_join->proc_list_entries;
4417  aligned_system_from = memb_join->system_from;
4418 
4419  log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4420  memb_set_log(instance, instance->totemsrp_log_level_trace,
4421  "proclist", proc_list, memb_join->proc_list_entries);
4422  memb_set_log(instance, instance->totemsrp_log_level_trace,
4423  "faillist", failed_list, memb_join->failed_list_entries);
4424  memb_set_log(instance, instance->totemsrp_log_level_trace,
4425  "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4426  memb_set_log(instance, instance->totemsrp_log_level_trace,
4427  "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4428 
4430  if (instance->flushing) {
4433  "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4435  if (memb_join->failed_list_entries > 0) {
4436  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4437  }
4438  } else {
4440  "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4441  }
4442  return;
4443  } else {
4446  "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4447  if (memb_join->failed_list_entries > 0) {
4448  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4449  }
4450  }
4451  }
4452 
4453  }
4454 
4455  if (memb_set_equal (proc_list,
4457  instance->my_proc_list,
4458  instance->my_proc_list_entries) &&
4459 
4460  memb_set_equal (failed_list,
4462  instance->my_failed_list,
4463  instance->my_failed_list_entries)) {
4464 
4466  memb_consensus_set (instance, &aligned_system_from);
4467  }
4468 
4469  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4470  instance->failed_to_recv = 0;
4471  instance->my_proc_list[0] = instance->my_id;
4472  instance->my_proc_list_entries = 1;
4473  instance->my_failed_list_entries = 0;
4474 
4475  memb_state_commit_token_create (instance);
4476 
4477  memb_state_commit_enter (instance);
4478  return;
4479  }
4480  if (memb_consensus_agreed (instance) &&
4481  memb_lowest_in_config (instance)) {
4482 
4483  memb_state_commit_token_create (instance);
4484 
4485  memb_state_commit_enter (instance);
4486  } else {
4487  goto out;
4488  }
4489  } else
4490  if (memb_set_subset (proc_list,
4492  instance->my_proc_list,
4493  instance->my_proc_list_entries) &&
4494 
4495  memb_set_subset (failed_list,
4497  instance->my_failed_list,
4498  instance->my_failed_list_entries)) {
4499 
4500  goto out;
4501  } else
4502  if (memb_set_subset (&aligned_system_from, 1,
4503  instance->my_failed_list, instance->my_failed_list_entries)) {
4504 
4505  goto out;
4506  } else {
4507  memb_set_merge (proc_list,
4509  instance->my_proc_list, &instance->my_proc_list_entries);
4510 
4511  if (memb_set_subset (
4512  &instance->my_id, 1,
4513  failed_list, memb_join->failed_list_entries)) {
4514 
4515  memb_set_merge (
4516  &aligned_system_from, 1,
4517  instance->my_failed_list, &instance->my_failed_list_entries);
4518  } else {
4519  if (memb_set_subset (
4520  &aligned_system_from, 1,
4521  instance->my_memb_list,
4522  instance->my_memb_entries)) {
4523 
4524  if (memb_set_subset (
4525  &aligned_system_from, 1,
4526  instance->my_failed_list,
4527  instance->my_failed_list_entries) == 0) {
4528 
4529  memb_set_merge (failed_list,
4531  instance->my_failed_list, &instance->my_failed_list_entries);
4532  } else {
4533  memb_set_subtract (fail_minus_memb,
4534  &fail_minus_memb_entries,
4535  failed_list,
4537  instance->my_memb_list,
4538  instance->my_memb_entries);
4539 
4540  memb_set_merge (fail_minus_memb,
4541  fail_minus_memb_entries,
4542  instance->my_failed_list,
4543  &instance->my_failed_list_entries);
4544  }
4545  }
4546  }
4547  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4548  gather_entered = 1;
4549  }
4550 
4551 out:
4552  if (gather_entered == 0 &&
4553  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4554 
4555  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4556  }
4557 }
4558 
4559 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4560 {
4561  int i;
4562  struct srp_addr *in_proc_list;
4563  struct srp_addr *in_failed_list;
4564  struct srp_addr *out_proc_list;
4565  struct srp_addr *out_failed_list;
4566 
4567  out->header.magic = TOTEM_MH_MAGIC;
4569  out->header.type = in->header.type;
4570  out->header.nodeid = swab32 (in->header.nodeid);
4571  out->system_from = srp_addr_endian_convert(in->system_from);
4574  out->ring_seq = swab64 (in->ring_seq);
4575 
4576  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4577  in_failed_list = in_proc_list + out->proc_list_entries;
4578  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4579  out_failed_list = out_proc_list + out->proc_list_entries;
4580 
4581  for (i = 0; i < out->proc_list_entries; i++) {
4582  out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4583  }
4584  for (i = 0; i < out->failed_list_entries; i++) {
4585  out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4586  }
4587 }
4588 
4589 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4590 {
4591  int i;
4592  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4593  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4594  struct memb_commit_token_memb_entry *in_memb_list;
4595  struct memb_commit_token_memb_entry *out_memb_list;
4596 
4597  out->header.magic = TOTEM_MH_MAGIC;
4599  out->header.type = in->header.type;
4600  out->header.nodeid = swab32 (in->header.nodeid);
4601  out->token_seq = swab32 (in->token_seq);
4602  out->ring_id.rep = swab32(in->ring_id.rep);
4603  out->ring_id.seq = swab64 (in->ring_id.seq);
4604  out->retrans_flg = swab32 (in->retrans_flg);
4605  out->memb_index = swab32 (in->memb_index);
4606  out->addr_entries = swab32 (in->addr_entries);
4607 
4608  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4609  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4610  for (i = 0; i < out->addr_entries; i++) {
4611  out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4612 
4613  /*
4614  * Only convert the memb entry if it has been set
4615  */
4616  if (in_memb_list[i].ring_id.rep != 0) {
4617  out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4618 
4619  out_memb_list[i].ring_id.seq =
4620  swab64 (in_memb_list[i].ring_id.seq);
4621  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4622  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4623  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4624  }
4625  }
4626 }
4627 
4628 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4629 {
4630  int i;
4631 
4632  out->header.magic = TOTEM_MH_MAGIC;
4634  out->header.type = in->header.type;
4635  out->header.nodeid = swab32 (in->header.nodeid);
4636  out->seq = swab32 (in->seq);
4637  out->token_seq = swab32 (in->token_seq);
4638  out->aru = swab32 (in->aru);
4639  out->ring_id.rep = swab32(in->ring_id.rep);
4640  out->aru_addr = swab32(in->aru_addr);
4641  out->ring_id.seq = swab64 (in->ring_id.seq);
4642  out->fcc = swab32 (in->fcc);
4643  out->backlog = swab32 (in->backlog);
4644  out->retrans_flg = swab32 (in->retrans_flg);
4646  for (i = 0; i < out->rtr_list_entries; i++) {
4647  out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4648  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4649  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4650  }
4651 }
4652 
4653 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4654 {
4655  out->header.magic = TOTEM_MH_MAGIC;
4657  out->header.type = in->header.type;
4658  out->header.nodeid = swab32 (in->header.nodeid);
4660 
4661  out->seq = swab32 (in->seq);
4662  out->this_seqno = swab32 (in->this_seqno);
4663  out->ring_id.rep = swab32(in->ring_id.rep);
4664  out->ring_id.seq = swab64 (in->ring_id.seq);
4665  out->node_id = swab32 (in->node_id);
4666  out->guarantee = swab32 (in->guarantee);
4667  out->system_from = srp_addr_endian_convert(in->system_from);
4668 }
4669 
4670 static void memb_merge_detect_endian_convert (
4671  const struct memb_merge_detect *in,
4672  struct memb_merge_detect *out)
4673 {
4674  out->header.magic = TOTEM_MH_MAGIC;
4676  out->header.type = in->header.type;
4677  out->header.nodeid = swab32 (in->header.nodeid);
4678  out->ring_id.rep = swab32(in->ring_id.rep);
4679  out->ring_id.seq = swab64 (in->ring_id.seq);
4680  out->system_from = srp_addr_endian_convert (in->system_from);
4681 }
4682 
4683 static int ignore_join_under_operational (
4684  struct totemsrp_instance *instance,
4685  const struct memb_join *memb_join)
4686 {
4687  struct srp_addr *proc_list;
4688  struct srp_addr *failed_list;
4689  unsigned long long ring_seq;
4690  struct srp_addr aligned_system_from;
4691 
4692  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4693  failed_list = proc_list + memb_join->proc_list_entries;
4695  aligned_system_from = memb_join->system_from;
4696 
4697  if (memb_set_subset (&instance->my_id, 1,
4698  failed_list, memb_join->failed_list_entries)) {
4699  return (1);
4700  }
4701 
4702  /*
4703  * In operational state, my_proc_list is exactly the same as
4704  * my_memb_list.
4705  */
4706  if ((memb_set_subset (&aligned_system_from, 1,
4707  instance->my_memb_list, instance->my_memb_entries)) &&
4708  (ring_seq < instance->my_ring_id.seq)) {
4709  return (1);
4710  }
4711 
4712  return (0);
4713 }
4714 
4715 static int message_handler_memb_join (
4716  struct totemsrp_instance *instance,
4717  const void *msg,
4718  size_t msg_len,
4719  int endian_conversion_needed)
4720 {
4721  const struct memb_join *memb_join;
4722  struct memb_join *memb_join_convert = alloca (msg_len);
4723  struct srp_addr aligned_system_from;
4724 
4725  if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4726  return (0);
4727  }
4728 
4729  if (endian_conversion_needed) {
4730  memb_join = memb_join_convert;
4731  memb_join_endian_convert (msg, memb_join_convert);
4732 
4733  } else {
4734  memb_join = msg;
4735  }
4736 
4737  aligned_system_from = memb_join->system_from;
4738 
4739  /*
4740  * If the process paused because it wasn't scheduled in a timely
4741  * fashion, flush the join messages because they may be queued
4742  * entries
4743  */
4744  if (pause_flush (instance)) {
4745  return (0);
4746  }
4747 
4748  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4749  instance->token_ring_id_seq = memb_join->ring_seq;
4750  }
4751  switch (instance->memb_state) {
4753  if (!ignore_join_under_operational (instance, memb_join)) {
4754  memb_join_process (instance, memb_join);
4755  }
4756  break;
4757 
4758  case MEMB_STATE_GATHER:
4759  memb_join_process (instance, memb_join);
4760  break;
4761 
4762  case MEMB_STATE_COMMIT:
4763  if (memb_set_subset (&aligned_system_from,
4764  1,
4765  instance->my_new_memb_list,
4766  instance->my_new_memb_entries) &&
4767 
4768  memb_join->ring_seq >= instance->my_ring_id.seq) {
4769 
4770  memb_join_process (instance, memb_join);
4771  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4772  }
4773  break;
4774 
4775  case MEMB_STATE_RECOVERY:
4776  if (memb_set_subset (&aligned_system_from,
4777  1,
4778  instance->my_new_memb_list,
4779  instance->my_new_memb_entries) &&
4780 
4781  memb_join->ring_seq >= instance->my_ring_id.seq) {
4782 
4783  memb_join_process (instance, memb_join);
4784  memb_recovery_state_token_loss (instance);
4785  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4786  }
4787  break;
4788  }
4789  return (0);
4790 }
4791 
4792 static int message_handler_memb_commit_token (
4793  struct totemsrp_instance *instance,
4794  const void *msg,
4795  size_t msg_len,
4796  int endian_conversion_needed)
4797 {
4798  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4800  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4801  int sub_entries;
4802 
4803  struct srp_addr *addr;
4804 
4806  "got commit token");
4807 
4808  if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4809  return (0);
4810  }
4811 
4812  if (endian_conversion_needed) {
4813  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4814  } else {
4815  memcpy (memb_commit_token_convert, msg, msg_len);
4816  }
4817  memb_commit_token = memb_commit_token_convert;
4819 
4820 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4821  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4822  return (0);
4823  }
4824 #endif
4825  switch (instance->memb_state) {
4827  /* discard token */
4828  break;
4829 
4830  case MEMB_STATE_GATHER:
4831  memb_set_subtract (sub, &sub_entries,
4832  instance->my_proc_list, instance->my_proc_list_entries,
4833  instance->my_failed_list, instance->my_failed_list_entries);
4834 
4835  if (memb_set_equal (addr,
4837  sub,
4838  sub_entries) &&
4839 
4840  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4841  memcpy (instance->commit_token, memb_commit_token, msg_len);
4842  memb_state_commit_enter (instance);
4843  }
4844  break;
4845 
4846  case MEMB_STATE_COMMIT:
4847  /*
4848  * If retransmitted commit tokens are sent on this ring
4849  * filter them out and only enter recovery once the
4850  * commit token has traversed the array. This is
4851  * determined by :
4852  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4853  */
4854  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4856  memb_state_recovery_enter (instance, memb_commit_token);
4857  }
4858  break;
4859 
4860  case MEMB_STATE_RECOVERY:
4861  if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4862 
4863  /* Filter out duplicated tokens */
4864  if (instance->originated_orf_token) {
4865  break;
4866  }
4867 
4868  instance->originated_orf_token = 1;
4869 
4871  "Sending initial ORF token");
4872 
4873  // TODO convert instead of initiate
4874  orf_token_send_initial (instance);
4875  reset_token_timeout (instance); // REVIEWED
4876  reset_token_retransmit_timeout (instance); // REVIEWED
4877  }
4878  break;
4879  }
4880  return (0);
4881 }
4882 
4883 static int message_handler_token_hold_cancel (
4884  struct totemsrp_instance *instance,
4885  const void *msg,
4886  size_t msg_len,
4887  int endian_conversion_needed)
4888 {
4889  const struct token_hold_cancel *token_hold_cancel = msg;
4890 
4891  if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4892  return (0);
4893  }
4894 
4895  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4896  sizeof (struct memb_ring_id)) == 0) {
4897 
4898  instance->my_seq_unchanged = 0;
4899  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4900  timer_function_token_retransmit_timeout (instance);
4901  }
4902  }
4903  return (0);
4904 }
4905 
4906 static int check_message_header_validity(
4907  void *context,
4908  const void *msg,
4909  unsigned int msg_len,
4910  const struct sockaddr_storage *system_from)
4911 {
4912  struct totemsrp_instance *instance = context;
4913  const struct totem_message_header *message_header = msg;
4914  const char *guessed_str;
4915  const char *msg_byte = msg;
4916 
4917  if (msg_len < sizeof (struct totem_message_header)) {
4919  "Message received from %s is too short... Ignoring %u.",
4920  totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4921  return (-1);
4922  }
4923 
4924  if (message_header->magic != TOTEM_MH_MAGIC &&
4925  message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4926  /*
4927  * We've received ether Knet, old version of Corosync,
4928  * or something else. Do some guessing to display (hopefully)
4929  * helpful message
4930  */
4931  guessed_str = NULL;
4932 
4933  if (message_header->magic == 0xFFFF) {
4934  /*
4935  * Corosync 2.2 used header with two UINT8_MAX
4936  */
4937  guessed_str = "Corosync 2.2";
4938  } else if (message_header->magic == 0xFEFE) {
4939  /*
4940  * Corosync 2.3+ used header with two UINT8_MAX - 1
4941  */
4942  guessed_str = "Corosync 2.3+";
4943  } else if (msg_byte[0] == 0x01) {
4944  /*
4945  * Knet has stable1 with first byte of message == 1
4946  */
4947  guessed_str = "unencrypted Kronosnet";
4948  } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4949  /*
4950  * Unencrypted Corosync 1.x/OpenAIS has first byte
4951  * 0-5. Collision with Knet (but still worth the try)
4952  */
4953  guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4954  } else {
4955  /*
4956  * Encrypted Kronosned packet has a hash at the end of
4957  * the packet and nothing specific at the beginning of the
4958  * packet (just encrypted data).
4959  * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
4960  * is in the beginning of the packet.
4961  *
4962  * So it's not possible to reliably detect ether of them.
4963  */
4964  guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4965  }
4966 
4968  "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4969  totemip_sa_print((struct sockaddr *)system_from),
4970  guessed_str);
4971 
4972  return (-1);
4973  }
4974 
4975  if (message_header->version != TOTEM_MH_VERSION) {
4977  "Message received from %s has unsupported version %u... Ignoring",
4978  totemip_sa_print((struct sockaddr *)system_from),
4979  message_header->version);
4980 
4981  return (-1);
4982  }
4983 
4984  return (0);
4985 }
4986 
4987 
4989  void *context,
4990  const void *msg,
4991  unsigned int msg_len,
4992  const struct sockaddr_storage *system_from)
4993 {
4994  struct totemsrp_instance *instance = context;
4995  const struct totem_message_header *message_header = msg;
4996 
4997  if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
4998  return ;
4999  }
5000 
5001  switch (message_header->type) {
5003  instance->stats.orf_token_rx++;
5004  break;
5005  case MESSAGE_TYPE_MCAST:
5006  instance->stats.mcast_rx++;
5007  break;
5009  instance->stats.memb_merge_detect_rx++;
5010  break;
5012  instance->stats.memb_join_rx++;
5013  break;
5015  instance->stats.memb_commit_token_rx++;
5016  break;
5018  instance->stats.token_hold_cancel_rx++;
5019  break;
5020  default:
5022  "Message received from %s has wrong type... ignoring %d.\n",
5023  totemip_sa_print((struct sockaddr *)system_from),
5024  (int)message_header->type);
5025 
5026  instance->stats.rx_msg_dropped++;
5027  return;
5028  }
5029  /*
5030  * Handle incoming message
5031  */
5032  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5033  instance,
5034  msg,
5035  msg_len,
5036  message_header->magic != TOTEM_MH_MAGIC);
5037 }
5038 
5040  void *context,
5041  const struct totem_ip_address *interface_addr,
5042  unsigned short ip_port,
5043  unsigned int iface_no)
5044 {
5045  struct totemsrp_instance *instance = context;
5046  int res;
5047 
5048  totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5049 
5050  res = totemnet_iface_set (
5051  instance->totemnet_context,
5052  interface_addr,
5053  ip_port,
5054  iface_no);
5055 
5056  return (res);
5057 }
5058 
5059 
5061  void *context,
5062  const struct totem_ip_address *iface_addr,
5063  unsigned int iface_no)
5064 {
5065  struct totemsrp_instance *instance = context;
5066  int num_interfaces;
5067  int i;
5068 
5069  if (!instance->my_id.nodeid) {
5070  instance->my_id.nodeid = iface_addr->nodeid;
5071  }
5072  totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5073 
5074  if (instance->iface_changes++ == 0) {
5075  instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5076  /*
5077  * Increase the ring_id sequence number. This doesn't follow specification.
5078  * Solves problem with restarted leader node (node with lowest nodeid) before
5079  * rest of the cluster forms new membership and guarantees unique ring_id for
5080  * new singleton configuration.
5081  */
5082  instance->my_ring_id.seq++;
5083 
5084  instance->token_ring_id_seq = instance->my_ring_id.seq;
5085  log_printf (
5086  instance->totemsrp_log_level_debug,
5087  "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5088  instance->my_ring_id.rep,
5089  (uint64_t)instance->my_ring_id.seq);
5090 
5091  if (instance->totemsrp_service_ready_fn) {
5092  instance->totemsrp_service_ready_fn ();
5093  }
5094 
5095  }
5096 
5097  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5098  totemsrp_member_add (instance,
5099  &instance->totem_config->interfaces[iface_no].member_list[i],
5100  iface_no);
5101  }
5102 
5103  num_interfaces = 0;
5104  for (i = 0; i < INTERFACE_MAX; i++) {
5105  if (instance->totem_config->interfaces[i].configured) {
5106  num_interfaces++;
5107  }
5108  }
5109 
5110  if (instance->iface_changes >= num_interfaces) {
5111  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5112  }
5113 }
5114 
5116  totem_config->net_mtu -= 2 * sizeof (struct mcast);
5117 }
5118 
5120  void *context,
5121  void (*totem_service_ready) (void))
5122 {
5123  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5124 
5125  instance->totemsrp_service_ready_fn = totem_service_ready;
5126 }
5127 
5129  void *context,
5130  const struct totem_ip_address *member,
5131  int iface_no)
5132 {
5133  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5134  int res;
5135 
5136  res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5137 
5138  return (res);
5139 }
5140 
5142  void *context,
5143  const struct totem_ip_address *member,
5144  int iface_no)
5145 {
5146  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5147  int res;
5148 
5149  res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5150 
5151  return (res);
5152 }
5153 
5154 void totemsrp_threaded_mode_enable (void *context)
5155 {
5156  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5157 
5158  instance->threaded_mode_enabled = 1;
5159 }
5160 
5161 void totemsrp_trans_ack (void *context)
5162 {
5163  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5164 
5165  instance->waiting_trans_ack = 0;
5166  instance->totemsrp_waiting_trans_ack_cb_fn (0);
5167 }
5168 
5169 
5170 int totemsrp_reconfigure (void *context, struct totem_config *totem_config)
5171 {
5172  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5173  int res;
5174 
5176  return (res);
5177 }
5178 
5179 void totemsrp_stats_clear (void *context, int flags)
5180 {
5181  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5182 
5183  memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5186  }
5187 }
5188 
5189 void totemsrp_force_gather (void *context)
5190 {
5191  timer_function_orf_token_timeout(context);
5192 }
totem_config::token_warning
unsigned int token_warning
Definition: totem.h:176
totemsrp_stats_t::rx_msg_dropped
uint64_t rx_msg_dropped
Definition: totemstats.h:77
swab32
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition: totemsrp.c:545
totem_config::max_messages
unsigned int max_messages
Definition: totem.h:212
memb_join::failed_list_entries
unsigned int failed_list_entries
Definition: totemsrp.c:216
TOTEM_CONFIGURATION_REGULAR
@ TOTEM_CONFIGURATION_REGULAR
Definition: coroapi.h:133
orf_token::seq
unsigned int seq
Definition: totemsrp.c:199
token_callback_instance
Definition: totemsrp.c:166
totemsrp_instance::my_proc_list_entries
int my_proc_list_entries
Definition: totemsrp.c:321
totemsrp_instance::token_sent_event_handle
void * token_sent_event_handle
Definition: totemsrp.c:526
message_handlers
Definition: totemsrp.c:530
TOTEM_CALLBACK_TOKEN_SENT
@ TOTEM_CALLBACK_TOKEN_SENT
Definition: coroapi.h:144
totem_config::heartbeat_failures_allowed
unsigned int heartbeat_failures_allowed
Definition: totem.h:206
totem_logging_configuration::log_level_error
int log_level_error
Definition: totem.h:109
totemsrp_instance::totemsrp_log_level_debug
int totemsrp_log_level_debug
Definition: totemsrp.c:429
rtr_item::seq
unsigned int seq
Definition: totemsrp.c:193
totemnet_mcast_flush_send
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:410
totemsrp_instance::memb_ring_id_create_or_load
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:469
totem_callback_token_type
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
totemsrp_crypto_set
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1085
orf_token::fcc
unsigned int fcc
Definition: totemsrp.c:205
totem_message_header::type
char type
Definition: totem.h:127
totemsrp_instance::my_high_delivered
unsigned int my_high_delivered
Definition: totemsrp.c:383
main_deliver_fn
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemsrp.c:4988
totemsrp_instance::my_leave_memb_entries
int my_leave_memb_entries
Definition: totemsrp.c:335
RETRANS_MESSAGE_QUEUE_SIZE_MAX
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:94
value
uint32_t value
Definition: exec/votequorum.c:101
memb_commit_token::retrans_flg
unsigned int retrans_flg
Definition: totemsrp.c:252
totem_config::token_retransmits_before_loss_const
unsigned int token_retransmits_before_loss_const
Definition: totem.h:182
totemsrp_stats_t::token_hold_cancel_tx
uint64_t token_hold_cancel_tx
Definition: totemstats.h:66
orf_token::retrans_flg
int retrans_flg
Definition: totemsrp.c:206
totem_configuration_type
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
token_callback_instance::list
struct qb_list_head list
Definition: totemsrp.c:167
totemsrp_stats_t::memb_commit_token_tx
uint64_t memb_commit_token_tx
Definition: totemstats.h:64
totem_config::join_timeout
unsigned int join_timeout
Definition: totem.h:184
totem_logging_configuration::log_level_trace
int log_level_trace
Definition: totem.h:113
totem_config::downcheck_timeout
unsigned int downcheck_timeout
Definition: totem.h:192
TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition: totemsrp.c:549
TOKEN_SIZE_MAX
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:98
totemnet_send_flush
int totemnet_send_flush(void *net_context)
Definition: totemnet.c:388
totemsrp_instance::timer_pause_timeout
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
gather_state_from_desc
const char * gather_state_from_desc[]
Definition: totemsrp.c:559
totemsrp_stats_t::commit_token_lost
uint64_t commit_token_lost
Definition: totemstats.h:73
consensus_list_item::addr
struct srp_addr addr
Definition: totemsrp.c:161
memb_commit_token_memb_entry::aru
unsigned int aru
Definition: totemsrp.c:242
orf_token::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:203
__attribute__
struct message_item __attribute__
totemsrp_instance::totemsrp_log_level_security
int totemsrp_log_level_security
Definition: totemsrp.c:421
totemsrp_instance::last_released
unsigned int last_released
Definition: totemsrp.c:483
TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition: totemsrp.c:554
totem_interface::member_count
int member_count
Definition: totem.h:88
mcast::node_id
unsigned int node_id
Definition: totemsrp.c:186
message_handlers::handler_functions
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:532
TOTEM_TOKEN_STATS_MAX
#define TOTEM_TOKEN_STATS_MAX
Definition: totemstats.h:89
TOTEMSRP_GSFROM_MERGE_DURING_JOIN
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition: totemsrp.c:551
totem_interface::member_list
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:95
totemsrp_instance::old_ring_state_saved
int old_ring_state_saved
Definition: totemsrp.c:487
totemsrp_instance::totemsrp_confchg_fn
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:457
MEMB_STATE_GATHER
@ MEMB_STATE_GATHER
Definition: totemsrp.c:276
totemsrp_stats_t::latest_token
int latest_token
Definition: totemstats.h:88
cs_queue
Definition: cs_queue.h:44
totemsrp_finalize
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1024
MESSAGE_TYPE_ORF_TOKEN
@ MESSAGE_TYPE_ORF_TOKEN
Definition: totemsrp.c:144
totem_config::totem_memb_ring_id_store
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:242
mcast::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:185
totem_message_header::nodeid
unsigned int nodeid
Definition: totem.h:129
totemsrp_instance::my_last_aru
unsigned int my_last_aru
Definition: totemsrp.c:345
token_callback_instance::delete
int delete
Definition: totemsrp.c:170
memb_ring_id::rep
unsigned int rep
Definition: totem.h:148
TOTEMSRP_GSFROM_MAX
@ TOTEMSRP_GSFROM_MAX
Definition: totemsrp.c:556
totem_config::fail_to_recv_const
unsigned int fail_to_recv_const
Definition: totem.h:194
LOGSYS_LEVEL_DEBUG
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
totem_logging_configuration::log_level_debug
int log_level_debug
Definition: totem.h:112
addr_entries
int addr_entries
Definition: totemsrp.c:7
totemsrp_stats_t::memb_join_rx
uint64_t memb_join_rx
Definition: totemstats.h:60
totemsrp_stats_t
Definition: totemstats.h:53
backlog
unsigned int backlog
Definition: totemsrp.c:8
totemsrp_instance::my_aru
unsigned int my_aru
Definition: totemsrp.c:381
totemsrp_instance::commit_token
struct memb_commit_token * commit_token
Definition: totemsrp.c:511
type
char type
Definition: totem.h:4
TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition: totemsrp.c:547
log_printf
#define log_printf(level, format, args...)
Definition: totemsrp.c:687
totemsrp.h
QUEUE_RTR_ITEMS_SIZE_MAX
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:93
MESSAGE_TYPE_MEMB_MERGE_DETECT
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition: totemsrp.c:146
MESSAGE_NOT_ENCAPSULATED
@ MESSAGE_NOT_ENCAPSULATED
Definition: totemsrp.c:154
totem_logging_configuration::log_level_notice
int log_level_notice
Definition: totem.h:111
totemnet_buffer_alloc
void * totemnet_buffer_alloc(void *net_context)
Definition: totemnet.c:351
totemsrp_instance::fcc_remcast_current
int fcc_remcast_current
Definition: totemsrp.c:293
totemsrp_stats_t::earliest_token
int earliest_token
Definition: totemstats.h:87
totemsrp_token_stats_t::rx
uint32_t rx
Definition: totemstats.h:48
totemsrp_instance::my_token_held
int my_token_held
Definition: totemsrp.c:479
totemsrp_service_ready_register
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5119
totemsrp_instance::my_cbl
unsigned int my_cbl
Definition: totemsrp.c:507
totemsrp_callback_token_create
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3454
mcast
Definition: totemsrp.c:180
totemsrp_token_stats_t::tx
uint32_t tx
Definition: totemstats.h:49
TOTEMSRP_GSFROM_GATHER_MISSING1
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition: totemsrp.c:541
high_delivered
unsigned int high_delivered
Definition: totemsrp.c:4
totemsrp_instance::commit_token_storage
char commit_token_storage[40000]
Definition: totemsrp.c:527
MEMB_STATE_OPERATIONAL
@ MEMB_STATE_OPERATIONAL
Definition: totemsrp.c:275
memb_join::proc_list_entries
unsigned int proc_list_entries
Definition: totemsrp.c:215
totemsrp_my_family_get
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1110
totemsrp_instance::fcc_mcast_last
int fcc_mcast_last
Definition: totemsrp.c:291
totem_message_header::encapsulated
char encapsulated
Definition: totem.h:128
memb_join::end_of_memb_join
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:218
TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition: totemsrp.c:550
totemsrp_instance::timer_orf_token_retransmit_timeout
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:404
TOTEM_CALLBACK_TOKEN_RECEIVED
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition: coroapi.h:143
totemsrp_reconfigure
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5170
totem_ip_address::nodeid
unsigned int nodeid
Definition: coroapi.h:112
totemnet_buffer_release
void totemnet_buffer_release(void *net_context, void *ptr)
Definition: totemnet.c:359
totemsrp_socket
Definition: totemsrp.c:175
totemsrp_stats_t::mcast_tx
uint64_t mcast_tx
Definition: totemstats.h:61
totemsrp_net_mtu_adjust
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5115
totempg_stats_t
Definition: totemstats.h:94
proc_list_entries
unsigned int proc_list_entries
Definition: totemsrp.c:4
MESSAGE_TYPE_MCAST
@ MESSAGE_TYPE_MCAST
Definition: totemsrp.c:145
totemsrp_instance::my_new_memb_list
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
totemsrp_instance::my_set_retrans_flg
int my_set_retrans_flg
Definition: totemsrp.c:357
totem_config::merge_timeout
unsigned int merge_timeout
Definition: totem.h:190
totemsrp_instance::my_deliver_memb_list
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
totemsrp_instance::my_trans_memb_entries
int my_trans_memb_entries
Definition: totemsrp.c:327
totemsrp_instance::my_ring_id
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
CS_PRI_NODE_ID
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
message_item::msg_len
unsigned int msg_len
Definition: totemsrp.c:266
mcast::system_from
struct srp_addr system_from
Definition: totemsrp.c:182
totemsrp_stats_t::mcast_rx
uint64_t mcast_rx
Definition: totemstats.h:63
FRAME_SIZE_MAX
#define FRAME_SIZE_MAX
Definition: totem.h:52
consensus_list_item
Definition: totemsrp.c:160
totemsrp_instance::timer_orf_token_warning
qb_loop_timer_handle timer_orf_token_warning
Definition: totemsrp.c:402
swab.h
totem_config::token_timeout
unsigned int token_timeout
Definition: totem.h:174
totemsrp_instance::failed_to_recv
int failed_to_recv
Definition: totemsrp.c:284
mcast::header
struct totem_message_header header
Definition: totemsrp.c:181
totemsrp_stats_clear
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5179
totemnet_stats_clear
void totemnet_stats_clear(void *net_context)
Definition: totemnet.c:574
totemsrp_token_stats_t::backlog_calc
int backlog_calc
Definition: totemstats.h:50
memb_join::ring_seq
unsigned long long ring_seq
Definition: totemsrp.c:217
totem_config::interfaces
struct totem_interface * interfaces
Definition: totem.h:158
totemsrp_instance::my_aru_count
int my_aru_count
Definition: totemsrp.c:341
addr
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
totemsrp_iface_set
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5039
totemip_copy
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:123
totemsrp_stats_t::orf_token_rx
uint64_t orf_token_rx
Definition: totemstats.h:56
totemsrp_instance::threaded_mode_enabled
uint32_t threaded_mode_enabled
Definition: totemsrp.c:519
TOTEM_MH_MAGIC
#define TOTEM_MH_MAGIC
Definition: totem.h:121
memb_commit_token
Definition: totemsrp.c:248
srp_addr
Definition: totemsrp.c:104
totemnet_token_send
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:398
totemsrp_instance::my_seq_unchanged
int my_seq_unchanged
Definition: totemsrp.c:347
totemsrp_stats_t::memb_merge_detect_rx
uint64_t memb_merge_detect_rx
Definition: totemstats.h:58
sq
The sq struct.
Definition: sq.h:43
message_type
message_type
Definition: totemsrp.c:143
seq
unsigned int seq
Definition: totemsrp.c:4
totemsrp_event_signal
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2447
CS_PRI_RING_ID_SEQ
#define CS_PRI_RING_ID_SEQ
Definition: corotypes.h:60
totemsrp_socket::mcast
int mcast
Definition: totemsrp.c:176
totemsrp_instance::consensus_list_entries
int consensus_list_entries
Definition: totemsrp.c:297
MESSAGE_QUEUE_MAX
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
totemnet_reconfigure
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition: totemnet.c:560
memb_commit_token::memb_index
int memb_index
Definition: totemsrp.c:253
totem_config::token_hold_timeout
unsigned int token_hold_timeout
Definition: totem.h:180
totemsrp_instance::my_install_seq
unsigned int my_install_seq
Definition: totemsrp.c:353
message_item::mcast
struct mcast * mcast
Definition: totemsrp.c:265
totemsrp_instance::totemsrp_log_level_warning
int totemsrp_log_level_warning
Definition: totemsrp.c:425
totemsrp_instance::memb_ring_id_store
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:473
totemsrp_instance::old_ring_state_high_seq_received
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:491
cs_queue.h
totemsrp_instance::global_seqno
int global_seqno
Definition: totemsrp.c:477
totemnet_finalize
int totemnet_finalize(void *net_context)
Definition: totemnet.c:290
totem_config::net_mtu
unsigned int net_mtu
Definition: totem.h:202
totemsrp_instance::stats
totemsrp_stats_t stats
Definition: totemsrp.c:513
totemsrp_instance::regular_sort_queue
struct sq regular_sort_queue
Definition: totemsrp.c:374
totemsrp_instance::lowest_active_if
int lowest_active_if
Definition: totemsrp.c:299
INTERFACE_MAX
#define INTERFACE_MAX
Definition: coroapi.h:88
totemsrp_stats_t::gather_entered
uint64_t gather_entered
Definition: totemstats.h:70
orf_token
Definition: totemsrp.c:197
rtr_item
Definition: totemsrp.c:191
TOTEMPG_STATS_CLEAR_TRANSPORT
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition: totemstats.h:116
totemsrp_instance::my_failed_list
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition: totemsrp.c:553
totemsrp_instance::orf_token_discard
uint32_t orf_token_discard
Definition: totemsrp.c:515
totemsrp_instance::totemsrp_deliver_fn
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:451
orf_token::aru_addr
unsigned int aru_addr
Definition: totemsrp.c:202
totemsrp_instance::old_ring_state_aru
int old_ring_state_aru
Definition: totemsrp.c:489
totem_interface::configured
uint8_t configured
Definition: totem.h:87
aru
unsigned int aru
Definition: totemsrp.c:5
totemsrp_stats_t::mcast_retx
uint64_t mcast_retx
Definition: totemstats.h:62
gather_state_from
gather_state_from
Definition: totemsrp.c:539
MESSAGE_ENCAPSULATED
@ MESSAGE_ENCAPSULATED
Definition: totemsrp.c:153
totemsrp_instance::my_deliver_memb_entries
int my_deliver_memb_entries
Definition: totemsrp.c:331
memb_commit_token::header
struct totem_message_header header
Definition: totemsrp.c:249
token_callback_instance::callback_type
enum totem_callback_token_type callback_type
Definition: totemsrp.c:169
totemsrp_instance::my_left_memb_list
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
message_handlers::count
int count
Definition: totemsrp.c:531
totemsrp_instance::token_callback_received_listhead
struct qb_list_head token_callback_received_listhead
Definition: totemsrp.c:385
totemnet_ifaces_get
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemnet.c:468
totemsrp_instance::my_retrans_flg_count
int my_retrans_flg_count
Definition: totemsrp.c:359
totemsrp_instance::recovery_sort_queue
struct sq recovery_sort_queue
Definition: totemsrp.c:376
totem_interface::boundto
struct totem_ip_address boundto
Definition: totem.h:83
memb_commit_token::end_of_commit_token
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:255
totemsrp_trans_ack
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5161
totem_config::miss_count_const
unsigned int miss_count_const
Definition: totem.h:232
totemsrp_instance::my_old_ring_id
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
totemsrp_instance::totemsrp_service_ready_fn
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:464
totemnet_member_add
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:504
totemsrp_instance::my_token_seq
unsigned int my_token_seq
Definition: totemsrp.c:393
totem_logging_configuration::log_subsys_id
int log_subsys_id
Definition: totem.h:114
totem_ip_address::family
unsigned short family
Definition: coroapi.h:113
totem_interface::mcast_addr
struct totem_ip_address mcast_addr
Definition: totem.h:84
totemsrp_force_gather
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5189
totemsrp_instance::fcc_remcast_last
int fcc_remcast_last
Definition: totemsrp.c:289
totemnet_crypto_set
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition: totemnet.c:276
MESSAGE_TYPE_MEMB_COMMIT_TOKEN
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition: totemsrp.c:148
totemsrp_instance::totemsrp_waiting_trans_ack_cb_fn
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:466
swab16
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
flags
uint32_t flags
Definition: exec/votequorum.c:103
orf_token::token_seq
unsigned int token_seq
Definition: totemsrp.c:200
totem_config
Definition: totem.h:152
totemsrp_instance::my_high_seq_received
unsigned int my_high_seq_received
Definition: totemsrp.c:351
rtr_list
struct rtr_item rtr_list[0]
Definition: totemsrp.c:12
totemsrp_stats_t::token_hold_cancel_rx
uint64_t token_hold_cancel_rx
Definition: totemstats.h:67
totemsrp_instance::my_high_ring_delivered
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
orf_token::header
struct totem_message_header header
Definition: totemsrp.c:198
TOTEM_MH_VERSION
#define TOTEM_MH_VERSION
Definition: totem.h:122
totemnet.h
TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition: totemsrp.c:546
TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition: totemsrp.c:543
totem_ip_address
The totem_ip_address struct.
Definition: coroapi.h:111
TOTEMSRP_GSFROM_INTERFACE_CHANGE
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition: totemsrp.c:555
consensus_list_item::set
int set
Definition: totemsrp.c:162
sq.h
totemip_sa_print
const char * totemip_sa_print(const struct sockaddr *sa)
Definition: totemip.c:242
CS_PRI_RING_ID
#define CS_PRI_RING_ID
Definition: corotypes.h:61
totemsrp_instance::my_memb_list
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
totemsrp_instance::timer_heartbeat_timeout
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:416
MEMB_STATE_RECOVERY
@ MEMB_STATE_RECOVERY
Definition: totemsrp.c:278
TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition: totemsrp.c:542
totemsrp_instance::timer_merge_detect_timeout
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:408
totem_message_header
Definition: totem.h:124
memb_ring_id
The memb_ring_id struct.
Definition: coroapi.h:122
memb_commit_token::token_seq
unsigned int token_seq
Definition: totemsrp.c:250
memb_join::header
struct totem_message_header header
Definition: totemsrp.c:213
TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition: totemsrp.c:544
totemsrp_stats_t::continuous_gather
uint32_t continuous_gather
Definition: totemstats.h:78
main_iface_change_fn
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5060
rtr_item::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:192
totemsrp_instance
Definition: totemsrp.c:281
LEAVE_DUMMY_NODEID
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:99
TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition: totemsrp.c:540
totemsrp_instance::tv_old
struct timeval tv_old
Definition: totemsrp.c:495
ring_seq
unsigned long long ring_seq
Definition: totemsrp.c:6
totemsrp_stats_t::gather_token_lost
uint64_t gather_token_lost
Definition: totemstats.h:71
totemsrp_instance::consensus_list
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:295
srp_addr::nodeid
unsigned int nodeid
Definition: totemsrp.c:105
totemsrp_instance::memb_timer_state_commit_timeout
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:414
totemsrp_instance::my_id
struct srp_addr my_id
Definition: totemsrp.c:301
totemsrp_instance::my_pbl
unsigned int my_pbl
Definition: totemsrp.c:505
sort_queue_item::msg_len
unsigned int msg_len
Definition: totemsrp.c:271
system_from
struct srp_addr system_from
Definition: totemsrp.c:3
memb_commit_token_memb_entry::high_delivered
unsigned int high_delivered
Definition: totemsrp.c:243
totemsrp_instance::orf_token_retransmit
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
totem_logging_configuration::log_level_warning
int log_level_warning
Definition: totem.h:110
totem_logging_configuration::log_level_security
void(*) in log_level_security)
Definition: totem.h:106
orf_token::aru
unsigned int aru
Definition: totemsrp.c:201
totem_event_type
totem_event_type
Definition: totem.h:259
totemsrp_stats_t::token
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totemstats.h:90
totemsrp_instance::my_proc_list
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
failed_list_entries
unsigned int failed_list_entries
Definition: totemsrp.c:5
memb_merge_detect
Definition: totemsrp.c:227
totemsrp_instance::my_last_seq
unsigned int my_last_seq
Definition: totemsrp.c:493
totemnet_mcast_noflush_send
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:423
memb_commit_token_memb_entry::received_flg
unsigned int received_flg
Definition: totemsrp.c:244
totemsrp_instance::use_heartbeat
unsigned int use_heartbeat
Definition: totemsrp.c:501
totemsrp_instance::set_aru
unsigned int set_aru
Definition: totemsrp.c:485
totemsrp_stats_t::memb_merge_detect_tx
uint64_t memb_merge_detect_tx
Definition: totemstats.h:57
swab64
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
memb_merge_detect::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:230
totemsrp_instance::my_rotation_counter
int my_rotation_counter
Definition: totemsrp.c:355
totemsrp_stats_t::memb_commit_token_rx
uint64_t memb_commit_token_rx
Definition: totemstats.h:65
token_hold_cancel
Definition: totemsrp.c:234
totemsrp_instance::my_trans_memb_list
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
totem_config::token_retransmit_timeout
unsigned int token_retransmit_timeout
Definition: totem.h:178
SEQNO_START_TOKEN
#define SEQNO_START_TOKEN
Definition: totemsrp.c:119
totemnet_iface_set
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemnet.c:455
totemsrp_instance::totemsrp_log_level_error
int totemsrp_log_level_error
Definition: totemsrp.c:423
RETRANSMIT_ENTRIES_MAX
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:97
totem_message_header::magic
unsigned short magic
Definition: totem.h:125
token_callback_instance::data
void * data
Definition: totemsrp.c:171
totemsrp_instance::my_received_flg
int my_received_flg
Definition: totemsrp.c:349
totem_message_header::version
char version
Definition: totem.h:126
totemsrp_message_handlers
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:675
nodeid
unsigned int nodeid
Definition: coroapi.h:75
totemsrp_instance::my_trc
unsigned int my_trc
Definition: totemsrp.c:503
totemnet_token_target_set
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition: totemnet.c:481
totem_config::totem_memb_ring_id_create_or_load
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:238
mcast::seq
unsigned int seq
Definition: totemsrp.c:183
totemsrp_stats_t::operational_entered
uint64_t operational_entered
Definition: totemstats.h:68
totemsrp_instance::totem_config
struct totem_config * totem_config
Definition: totemsrp.c:499
totemsrp_stats_t::memb_join_tx
uint64_t memb_join_tx
Definition: totemstats.h:59
totemsrp_stats_t::recovery_entered
uint64_t recovery_entered
Definition: totemstats.h:74
totemsrp_instance::token_ring_id_seq
unsigned long long token_ring_id_seq
Definition: totemsrp.c:481
totemsrp_member_add
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5128
totemsrp_socket::token
int token
Definition: totemsrp.c:177
totem_config::seqno_unchanged_const
unsigned int seqno_unchanged_const
Definition: totem.h:196
memb_ring_id::seq
unsigned long long seq
Definition: coroapi.h:124
totemsrp_instance::totemsrp_log_printf
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:435
ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:6
totemsrp_avail
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2527
memb_commit_token_memb_entry
Definition: totemsrp.c:240
totemsrp_stats_t::consensus_timeouts
uint64_t consensus_timeouts
Definition: totemstats.h:76
TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition: totemsrp.c:552
totemsrp_instance::new_message_queue
struct cs_queue new_message_queue
Definition: totemsrp.c:368
totemsrp_stats_t::recovery_token_lost
uint64_t recovery_token_lost
Definition: totemstats.h:75
totemsrp_instance::retrans_message_queue
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
totem_config::send_join_timeout
unsigned int send_join_timeout
Definition: totem.h:186
totem_config::window_size
unsigned int window_size
Definition: totem.h:210
PROCESSOR_COUNT_MAX
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
totemsrp_stats_t::commit_entered
uint64_t commit_entered
Definition: totemstats.h:72
received_flg
unsigned int received_flg
Definition: totemsrp.c:5
config.h
orf_token::rtr_list
struct rtr_item rtr_list[0]
Definition: totemsrp.c:208
orf_token::backlog
unsigned int backlog
Definition: totemsrp.c:204
mcast::guarantee
int guarantee
Definition: totemsrp.c:187
message_item
Definition: totemsrp.c:264
totemnet_processor_count_set
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition: totemnet.c:367
MEMB_STATE_COMMIT
@ MEMB_STATE_COMMIT
Definition: totemsrp.c:277
encapsulation_type
encapsulation_type
Definition: totemsrp.c:152
logsys.h
totemsrp_instance::flushing
int flushing
Definition: totemsrp.c:523
sort_queue_item::mcast
struct mcast * mcast
Definition: totemsrp.c:270
totem_config::threads
unsigned int threads
Definition: totem.h:204
totemsrp_initialize
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:816
rtr_list_entries
int rtr_list_entries
Definition: totemsrp.c:11
totem_logging_configuration::log_printf
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
memb_state
memb_state
Definition: totemsrp.c:274
totemnet_member_remove
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:524
totemsrp_instance::iface_changes
int iface_changes
Definition: totemsrp.c:282
totemsrp_instance::totemsrp_poll_handle
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:447
totemsrp_instance::totemsrp_log_level_notice
int totemsrp_log_level_notice
Definition: totemsrp.c:427
totemsrp_instance::memb_timer_state_gather_consensus_timeout
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:412
totemsrp_instance::my_left_memb_entries
int my_left_memb_entries
Definition: totemsrp.c:333
totemsrp_instance::mcast_address
struct totem_ip_address mcast_address
Definition: totemsrp.c:449
memb_commit_token::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:251
totemsrp_instance::timer_orf_token_timeout
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
totemsrp_instance::my_merge_detect_timeout_outstanding
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
totemsrp_instance::heartbeat_timeout
int heartbeat_timeout
Definition: totemsrp.c:363
memb_merge_detect::header
struct totem_message_header header
Definition: totemsrp.c:228
MESSAGE_TYPE_MEMB_JOIN
@ MESSAGE_TYPE_MEMB_JOIN
Definition: totemsrp.c:147
memb_join::system_from
struct srp_addr system_from
Definition: totemsrp.c:214
MESSAGE_TYPE_TOKEN_HOLD_CANCEL
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition: totemsrp.c:149
totemsrp_instance::totemsrp_log_level_trace
int totemsrp_log_level_trace
Definition: totemsrp.c:431
totemnet_recv_flush
int totemnet_recv_flush(void *net_context)
Definition: totemnet.c:378
totemnet_initialize
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition: totemnet.c:301
totemsrp_my_nodeid_get
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1099
TOTEM_CONFIGURATION_TRANSITIONAL
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition: coroapi.h:134
token_hold_cancel::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:236
totemsrp_ifaces_get
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1047
sort_queue_item
Definition: totemsrp.c:269
TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition: totemsrp.c:548
totemsrp_member_remove
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5141
totemsrp_instance::timer_orf_token_hold_retransmit_timeout
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:406
totemsrp_instance::my_memb_entries
int my_memb_entries
Definition: totemsrp.c:329
SEQNO_START_MSG
#define SEQNO_START_MSG
Definition: totemsrp.c:118
guarantee
int guarantee
Definition: totemsrp.c:8
mcast::this_seqno
int this_seqno
Definition: totemsrp.c:184
totemsrp_instance::my_failed_list_entries
int my_failed_list_entries
Definition: totemsrp.c:323
memb_commit_token::addr_entries
int addr_entries
Definition: totemsrp.c:254
totemsrp_threaded_mode_enable
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5154
totemsrp_instance::pause_timestamp
uint64_t pause_timestamp
Definition: totemsrp.c:509
token_callback_instance::callback_fn
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:168
totemnet_recv_mcast_empty
int totemnet_recv_mcast_empty(void *net_context)
Definition: totemnet.c:493
totemsrp_stats_t::orf_token_tx
uint64_t orf_token_tx
Definition: totemstats.h:55
totemsrp_mcast
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2456
totemsrp_instance::totemsrp_subsys_id
int totemsrp_subsys_id
Definition: totemsrp.c:433
totemsrp_instance::token_recv_event_handle
void * token_recv_event_handle
Definition: totemsrp.c:525
totemsrp_instance::my_leave_memb_list
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
totemsrp_instance::totemnet_context
void * totemnet_context
Definition: totemsrp.c:497
totemnet_iface_check
int totemnet_iface_check(void *net_context)
Definition: totemnet.c:436
totem_config::max_network_delay
unsigned int max_network_delay
Definition: totem.h:208
memb_join
Definition: totemsrp.c:212
totemsrp_instance::my_addrs
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition: totemsrp.c:303
totemsrp_instance::new_message_queue_trans
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
memb_merge_detect::system_from
struct srp_addr system_from
Definition: totemsrp.c:229
totemsrp_callback_token_destroy
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3489
memb_commit_token_memb_entry::ring_id
struct memb_ring_id ring_id
Definition: totemsrp.c:241
orf_token::rtr_list_entries
int rtr_list_entries
Definition: totemsrp.c:207
totemsrp_instance::memb_timer_state_gather_join_timeout
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:410
totem_config::consensus_timeout
unsigned int consensus_timeout
Definition: totem.h:188
totemsrp_instance::token_callback_sent_listhead
struct qb_list_head token_callback_sent_listhead
Definition: totemsrp.c:387
totemsrp_stats_t::operational_token_lost
uint64_t operational_token_lost
Definition: totemstats.h:69
totemsrp_instance::waiting_trans_ack
uint32_t waiting_trans_ack
Definition: totemsrp.c:521
totemsrp_instance::orf_token_retransmit_size
int orf_token_retransmit_size
Definition: totemsrp.c:391
token_hold_cancel::header
struct totem_message_header header
Definition: totemsrp.c:235
totemsrp_instance::originated_orf_token
uint32_t originated_orf_token
Definition: totemsrp.c:517
totemsrp_instance::my_new_memb_entries
int my_new_memb_entries
Definition: totemsrp.c:325
tv_old
unsigned long long int tv_old
Definition: totemsrp.c:3796
totemsrp_instance::memb_state
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:443
totem_config::totem_logging_configuration
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200