corosync  3.0.2-dirty
exec/cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006-2019 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Christine Caulfield (ccaulfie@redhat.com)
7  * Author: Jan Friesse (jfriesse@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #ifdef HAVE_ALLOCA_H
39 #include <alloca.h>
40 #endif
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <time.h>
53 #include <assert.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 
57 #include <qb/qblist.h>
58 #include <qb/qbmap.h>
59 
60 #include <corosync/corotypes.h>
61 #include <qb/qbipc_common.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/logsys.h>
64 #include <corosync/coroapi.h>
65 
66 #include <corosync/cpg.h>
67 #include <corosync/ipc_cpg.h>
68 
69 #ifndef MAP_ANONYMOUS
70 #define MAP_ANONYMOUS MAP_ANON
71 #endif
72 
73 #include "service.h"
74 
75 LOGSYS_DECLARE_SUBSYS ("CPG");
76 
77 #define GROUP_HASH_SIZE 32
78 
87 };
88 
89 struct zcb_mapped {
90  struct qb_list_head list;
91  void *addr;
92  size_t size;
93 };
94 /*
95  * state` exec deliver
96  * match group name, pid -> if matched deliver for YES:
97  * XXX indicates impossible state
98  *
99  * join leave mcast
100  * UNJOINED XXX XXX NO
101  * LEAVE_STARTED XXX YES(unjoined_enter) YES
102  * JOIN_STARTED YES(join_started_enter) XXX NO
103  * JOIN_COMPLETED XXX NO YES
104  *
105  * join_started_enter
106  * set JOIN_COMPLETED
107  * add entry to process_info list
108  * unjoined_enter
109  * set UNJOINED
110  * delete entry from process_info list
111  *
112  *
113  * library accept join error codes
114  * UNJOINED YES(CS_OK) set JOIN_STARTED
115  * LEAVE_STARTED NO(CS_ERR_BUSY)
116  * JOIN_STARTED NO(CS_ERR_EXIST)
117  * JOIN_COMPlETED NO(CS_ERR_EXIST)
118  *
119  * library accept leave error codes
120  * UNJOINED NO(CS_ERR_NOT_EXIST)
121  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
122  * JOIN_STARTED NO(CS_ERR_BUSY)
123  * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
124  *
125  * library accept mcast
126  * UNJOINED NO(CS_ERR_NOT_EXIST)
127  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
128  * JOIN_STARTED YES(CS_OK)
129  * JOIN_COMPLETED YES(CS_OK)
130  */
131 enum cpd_state {
136 };
137 
141 };
142 
143 static struct qb_list_head joinlist_messages_head;
144 
145 struct cpg_pd {
146  void *conn;
148  uint32_t pid;
150  unsigned int flags;
152  uint64_t transition_counter; /* These two are used when sending fragmented messages */
154  struct qb_list_head list;
155  struct qb_list_head iteration_instance_list_head;
156  struct qb_list_head zcb_mapped_list_head;
157 };
158 
161  struct qb_list_head list;
162  struct qb_list_head items_list_head; /* List of process_info */
163  struct qb_list_head *current_pointer;
164 };
165 
166 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
167 
168 QB_LIST_DECLARE (cpg_pd_list_head);
169 
170 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
171 
172 static unsigned int my_member_list_entries;
173 
174 static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
175 
176 static unsigned int my_old_member_list_entries = 0;
177 
178 static struct corosync_api_v1 *api = NULL;
179 
180 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
181 
182 static mar_cpg_ring_id_t last_sync_ring_id;
183 
184 struct process_info {
185  unsigned int nodeid;
186  uint32_t pid;
188  struct qb_list_head list; /* on the group_info members list */
189 };
190 QB_LIST_DECLARE (process_info_list_head);
191 
193  uint32_t pid;
195 };
196 
201 };
202 
203 /*
204  * Service Interfaces required by service_message_handler struct
205  */
206 static char *cpg_exec_init_fn (struct corosync_api_v1 *);
207 
208 static int cpg_lib_init_fn (void *conn);
209 
210 static int cpg_lib_exit_fn (void *conn);
211 
212 static void message_handler_req_exec_cpg_procjoin (
213  const void *message,
214  unsigned int nodeid);
215 
216 static void message_handler_req_exec_cpg_procleave (
217  const void *message,
218  unsigned int nodeid);
219 
220 static void message_handler_req_exec_cpg_joinlist (
221  const void *message,
222  unsigned int nodeid);
223 
224 static void message_handler_req_exec_cpg_mcast (
225  const void *message,
226  unsigned int nodeid);
227 
228 static void message_handler_req_exec_cpg_partial_mcast (
229  const void *message,
230  unsigned int nodeid);
231 
232 static void message_handler_req_exec_cpg_downlist_old (
233  const void *message,
234  unsigned int nodeid);
235 
236 static void message_handler_req_exec_cpg_downlist (
237  const void *message,
238  unsigned int nodeid);
239 
240 static void exec_cpg_procjoin_endian_convert (void *msg);
241 
242 static void exec_cpg_joinlist_endian_convert (void *msg);
243 
244 static void exec_cpg_mcast_endian_convert (void *msg);
245 
246 static void exec_cpg_partial_mcast_endian_convert (void *msg);
247 
248 static void exec_cpg_downlist_endian_convert_old (void *msg);
249 
250 static void exec_cpg_downlist_endian_convert (void *msg);
251 
252 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
253 
254 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
255 
256 static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
257 
258 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
259 
260 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
261 
262 static void message_handler_req_lib_cpg_membership (void *conn,
263  const void *message);
264 
265 static void message_handler_req_lib_cpg_local_get (void *conn,
266  const void *message);
267 
268 static void message_handler_req_lib_cpg_iteration_initialize (
269  void *conn,
270  const void *message);
271 
272 static void message_handler_req_lib_cpg_iteration_next (
273  void *conn,
274  const void *message);
275 
276 static void message_handler_req_lib_cpg_iteration_finalize (
277  void *conn,
278  const void *message);
279 
280 static void message_handler_req_lib_cpg_zc_alloc (
281  void *conn,
282  const void *message);
283 
284 static void message_handler_req_lib_cpg_zc_free (
285  void *conn,
286  const void *message);
287 
288 static void message_handler_req_lib_cpg_zc_execute (
289  void *conn,
290  const void *message);
291 
292 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
293 
294 static int cpg_exec_send_downlist(void);
295 
296 static int cpg_exec_send_joinlist(void);
297 
298 static void downlist_inform_clients (void);
299 
300 static void joinlist_inform_clients (void);
301 
302 static void joinlist_messages_delete (void);
303 
304 static void cpg_sync_init (
305  const unsigned int *trans_list,
306  size_t trans_list_entries,
307  const unsigned int *member_list,
308  size_t member_list_entries,
309  const struct memb_ring_id *ring_id);
310 
311 static int cpg_sync_process (void);
312 
313 static void cpg_sync_activate (void);
314 
315 static void cpg_sync_abort (void);
316 
317 static void do_proc_join(
318  const mar_cpg_name_t *name,
319  uint32_t pid,
320  unsigned int nodeid,
321  int reason,
322  qb_map_t *group_notify_map);
323 
324 static void do_proc_leave(
325  const mar_cpg_name_t *name,
326  uint32_t pid,
327  unsigned int nodeid,
328  int reason);
329 
330 static int notify_lib_totem_membership (
331  void *conn,
332  int member_list_entries,
333  const unsigned int *member_list);
334 
335 static inline int zcb_all_free (
336  struct cpg_pd *cpd);
337 
338 static char *cpg_print_group_name (
339  const mar_cpg_name_t *group);
340 
341 /*
342  * Library Handler Definition
343  */
344 static struct corosync_lib_handler cpg_lib_engine[] =
345 {
346  { /* 0 - MESSAGE_REQ_CPG_JOIN */
347  .lib_handler_fn = message_handler_req_lib_cpg_join,
348  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
349  },
350  { /* 1 - MESSAGE_REQ_CPG_LEAVE */
351  .lib_handler_fn = message_handler_req_lib_cpg_leave,
352  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
353  },
354  { /* 2 - MESSAGE_REQ_CPG_MCAST */
355  .lib_handler_fn = message_handler_req_lib_cpg_mcast,
356  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
357  },
358  { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
359  .lib_handler_fn = message_handler_req_lib_cpg_membership,
360  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
361  },
362  { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
363  .lib_handler_fn = message_handler_req_lib_cpg_local_get,
364  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
365  },
366  { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
367  .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
368  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
369  },
370  { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
371  .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
372  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
373  },
374  { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
375  .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
376  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
377  },
378  { /* 8 - MESSAGE_REQ_CPG_FINALIZE */
379  .lib_handler_fn = message_handler_req_lib_cpg_finalize,
380  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
381  },
382  { /* 9 */
383  .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
384  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
385  },
386  { /* 10 */
387  .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
388  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
389  },
390  { /* 11 */
391  .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
392  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
393  },
394  { /* 12 */
395  .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
396  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
397  },
398 
399 };
400 
401 static struct corosync_exec_handler cpg_exec_engine[] =
402 {
403  { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
404  .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
405  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
406  },
407  { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
408  .exec_handler_fn = message_handler_req_exec_cpg_procleave,
409  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
410  },
411  { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
412  .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
413  .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
414  },
415  { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
416  .exec_handler_fn = message_handler_req_exec_cpg_mcast,
417  .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
418  },
419  { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
420  .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
421  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
422  },
423  { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
424  .exec_handler_fn = message_handler_req_exec_cpg_downlist,
425  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
426  },
427  { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
428  .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
429  .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
430  },
431 };
432 
434  .name = "corosync cluster closed process group service v1.01",
435  .id = CPG_SERVICE,
436  .priority = 1,
437  .private_data_size = sizeof (struct cpg_pd),
438  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
439  .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
440  .lib_init_fn = cpg_lib_init_fn,
441  .lib_exit_fn = cpg_lib_exit_fn,
442  .lib_engine = cpg_lib_engine,
443  .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
444  .exec_init_fn = cpg_exec_init_fn,
445  .exec_dump_fn = NULL,
446  .exec_engine = cpg_exec_engine,
447  .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
448  .sync_init = cpg_sync_init,
449  .sync_process = cpg_sync_process,
450  .sync_activate = cpg_sync_activate,
451  .sync_abort = cpg_sync_abort
452 };
453 
455 {
456  return (&cpg_service_engine);
457 }
458 
460  struct qb_ipc_request_header header __attribute__((aligned(8)));
461  mar_cpg_name_t group_name __attribute__((aligned(8)));
462  mar_uint32_t pid __attribute__((aligned(8)));
463  mar_uint32_t reason __attribute__((aligned(8)));
464 };
465 
467  struct qb_ipc_request_header header __attribute__((aligned(8)));
468  mar_cpg_name_t group_name __attribute__((aligned(8)));
469  mar_uint32_t msglen __attribute__((aligned(8)));
470  mar_uint32_t pid __attribute__((aligned(8)));
471  mar_message_source_t source __attribute__((aligned(8)));
472  mar_uint8_t message[] __attribute__((aligned(8)));
473 };
474 
476  struct qb_ipc_request_header header __attribute__((aligned(8)));
477  mar_cpg_name_t group_name __attribute__((aligned(8)));
478  mar_uint32_t msglen __attribute__((aligned(8)));
479  mar_uint32_t fraglen __attribute__((aligned(8)));
480  mar_uint32_t pid __attribute__((aligned(8)));
481  mar_uint32_t type __attribute__((aligned(8)));
482  mar_message_source_t source __attribute__((aligned(8)));
483  mar_uint8_t message[] __attribute__((aligned(8)));
484 };
485 
487  struct qb_ipc_request_header header __attribute__((aligned(8)));
488  mar_uint32_t left_nodes __attribute__((aligned(8)));
489  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
490 };
491 
493  struct qb_ipc_request_header header __attribute__((aligned(8)));
494  /* merge decisions */
495  mar_uint32_t old_members __attribute__((aligned(8)));
496  /* downlist below */
497  mar_uint32_t left_nodes __attribute__((aligned(8)));
498  mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
499 };
500 
501 struct joinlist_msg {
503  uint32_t pid;
505  struct qb_list_head list;
506 };
507 
508 static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
509 
510 /*
511  * Function print group name. It's not reentrant
512  */
513 static char *cpg_print_group_name(const mar_cpg_name_t *group)
514 {
515  static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
516  int dest_pos = 0;
517  char c;
518  int i;
519 
520  for (i = 0; i < group->length; i++) {
521  c = group->value[i];
522 
523  if (c >= ' ' && c < 0x7f && c != '\\') {
524  res[dest_pos++] = c;
525  } else {
526  if (c == '\\') {
527  res[dest_pos++] = '\\';
528  res[dest_pos++] = '\\';
529  } else {
530  snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
531  dest_pos += 4;
532  }
533  }
534  }
535  res[dest_pos] = 0;
536 
537  return (res);
538 }
539 
540 static void cpg_sync_init (
541  const unsigned int *trans_list,
542  size_t trans_list_entries,
543  const unsigned int *member_list,
544  size_t member_list_entries,
545  const struct memb_ring_id *ring_id)
546 {
547  int entries;
548  int i, j;
549  int found;
550 
551  my_sync_state = CPGSYNC_DOWNLIST;
552 
553  memcpy (my_member_list, member_list, member_list_entries *
554  sizeof (unsigned int));
555  my_member_list_entries = member_list_entries;
556 
557  last_sync_ring_id.nodeid = ring_id->nodeid;
558  last_sync_ring_id.seq = ring_id->seq;
559 
560  entries = 0;
561  /*
562  * Determine list of nodeids for downlist message
563  */
564  for (i = 0; i < my_old_member_list_entries; i++) {
565  found = 0;
566  for (j = 0; j < trans_list_entries; j++) {
567  if (my_old_member_list[i] == trans_list[j]) {
568  found = 1;
569  break;
570  }
571  }
572  if (found == 0) {
573  g_req_exec_cpg_downlist.nodeids[entries++] =
574  my_old_member_list[i];
575  }
576  }
577  g_req_exec_cpg_downlist.left_nodes = entries;
578 }
579 
580 static int cpg_sync_process (void)
581 {
582  int res = -1;
583 
584  if (my_sync_state == CPGSYNC_DOWNLIST) {
585  res = cpg_exec_send_downlist();
586  if (res == -1) {
587  return (-1);
588  }
589  my_sync_state = CPGSYNC_JOINLIST;
590  }
591  if (my_sync_state == CPGSYNC_JOINLIST) {
592  res = cpg_exec_send_joinlist();
593  }
594  return (res);
595 }
596 
597 static void cpg_sync_activate (void)
598 {
599  memcpy (my_old_member_list, my_member_list,
600  my_member_list_entries * sizeof (unsigned int));
601  my_old_member_list_entries = my_member_list_entries;
602 
603  downlist_inform_clients ();
604 
605  joinlist_inform_clients ();
606 
607  joinlist_messages_delete ();
608 
609  notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
610 }
611 
612 static void cpg_sync_abort (void)
613 {
614 
615  joinlist_messages_delete ();
616 }
617 
618 static int notify_lib_totem_membership (
619  void *conn,
620  int member_list_entries,
621  const unsigned int *member_list)
622 {
623  struct qb_list_head *iter;
624  char *buf;
625  int size;
627 
628  size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
629  sizeof(mar_uint32_t) * (member_list_entries);
630  buf = alloca(size);
631  if (!buf)
632  return CS_ERR_LIBRARY;
633 
634  res = (struct res_lib_cpg_totem_confchg_callback *)buf;
635  res->member_list_entries = member_list_entries;
636  res->header.size = size;
638  res->header.error = CS_OK;
639 
640  memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
641  memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
642 
643  if (conn == NULL) {
644  qb_list_for_each(iter, &cpg_pd_list_head) {
645  struct cpg_pd *cpg_pd = qb_list_entry (iter, struct cpg_pd, list);
646  api->ipc_dispatch_send (cpg_pd->conn, buf, size);
647  }
648  } else {
649  api->ipc_dispatch_send (conn, buf, size);
650  }
651 
652  return CS_OK;
653 }
654 
655 /*
656  * Helper function for notify_lib_joinlist which prepares member_list using
657  * process_info_list with removed left_list items.
658  * member_list_entries - When not NULL it contains number of member_list entries
659  * member_list - When not NULL it is used as pointer to start of preallocated
660  * array of members. Pointer is adjusted to the end of array on
661  * exit.
662  */
663 static void notify_lib_joinlist_fill_member_list(
664  const mar_cpg_name_t *group_name,
665  int left_list_entries,
666  const mar_cpg_address_t *left_list,
667  int *member_list_entries,
668  mar_cpg_address_t **member_list)
669 {
670  struct qb_list_head *iter;
671  int i;
672 
673  if (member_list_entries != NULL) {
674  *member_list_entries = 0;
675  }
676 
677  qb_list_for_each(iter, &process_info_list_head) {
678  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
679 
680  if (mar_name_compare (&pi->group, group_name) == 0) {
681  int in_left_list = 0;
682 
683  for (i = 0; i < left_list_entries; i++) {
684  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
685  in_left_list = 1;
686  break ;
687  }
688  }
689 
690  if (!in_left_list) {
691  if (member_list_entries != NULL) {
692  (*member_list_entries)++;
693  }
694 
695  if (member_list != NULL) {
696  (*member_list)->nodeid = pi->nodeid;
697  (*member_list)->pid = pi->pid;
698  (*member_list)->reason = CPG_REASON_UNDEFINED;
699  (*member_list)++;
700  }
701  }
702  }
703  }
704 }
705 
706 static int notify_lib_joinlist(
707  const mar_cpg_name_t *group_name,
708  int joined_list_entries,
709  mar_cpg_address_t *joined_list,
710  int left_list_entries,
711  mar_cpg_address_t *left_list,
712  int id)
713 {
714  int size;
715  char *buf;
716  struct qb_list_head *iter;
717  int member_list_entries;
718  struct res_lib_cpg_confchg_callback *res;
719  mar_cpg_address_t *retgi;
720  int i;
721 
722  /*
723  * Find size of member_list (use process_info_list but remove items in left_list)
724  */
725  notify_lib_joinlist_fill_member_list(group_name, left_list_entries, left_list,
726  &member_list_entries, NULL);
727 
728  size = sizeof(struct res_lib_cpg_confchg_callback) +
729  sizeof(mar_cpg_address_t) * (member_list_entries + left_list_entries + joined_list_entries);
730  buf = alloca(size);
731  if (!buf)
732  return CS_ERR_LIBRARY;
733 
734  res = (struct res_lib_cpg_confchg_callback *)buf;
735  res->joined_list_entries = joined_list_entries;
736  res->left_list_entries = left_list_entries;
737  res->member_list_entries = member_list_entries;
738  retgi = res->member_list;
739  res->header.size = size;
740  res->header.id = id;
741  res->header.error = CS_OK;
742  memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
743 
744  /*
745  * Fill res->memberlist. Use process_info_list but remove items in left_list.
746  */
747  notify_lib_joinlist_fill_member_list(group_name, left_list_entries, left_list,
748  NULL, &retgi);
749 
750  /*
751  * Fill res->left_list
752  */
753  if (left_list_entries) {
754  memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
755  retgi += left_list_entries;
756  }
757 
758  if (joined_list_entries) {
759  /*
760  * Fill res->joined_list
761  */
762  memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
763  retgi += joined_list_entries;
764 
765  /*
766  * Update cpd_state for all local joined processes in group
767  */
768  for (i = 0; i < joined_list_entries; i++) {
769  if (joined_list[i].nodeid == api->totem_nodeid_get()) {
770  qb_list_for_each(iter, &cpg_pd_list_head) {
771  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
772  if (joined_list[i].pid == cpd->pid &&
773  mar_name_compare (&cpd->group_name, group_name) == 0) {
775  }
776  }
777  }
778  }
779  }
780 
781  /*
782  * Send notification to all ipc clients joined in group_name
783  */
784  qb_list_for_each(iter, &cpg_pd_list_head) {
785  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
786  if (mar_name_compare (&cpd->group_name, group_name) == 0) {
787  if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
789 
790  api->ipc_dispatch_send (cpd->conn, buf, size);
791  cpd->transition_counter++;
792  }
793  }
794  }
795 
796  if (left_list_entries) {
797  /*
798  * Zero internal cpd state for all local processes leaving group
799  * (this loop is not strictly needed because left_list always either
800  * contains exactly one process running on local node or more items
801  * but none of them is running on local node)
802  */
803  for (i = 0; i < joined_list_entries; i++) {
804  if (left_list[i].nodeid == api->totem_nodeid_get() &&
805  left_list[i].reason == CONFCHG_CPG_REASON_LEAVE) {
806  qb_list_for_each(iter, &cpg_pd_list_head) {
807  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
808  if (left_list[i].pid == cpd->pid &&
809  mar_name_compare (&cpd->group_name, group_name) == 0) {
810  cpd->pid = 0;
811  memset (&cpd->group_name, 0, sizeof(cpd->group_name));
813  }
814  }
815  }
816  }
817  }
818 
819  /*
820  * Traverse thru cpds and send totem membership for cpd, where it is not send yet
821  */
822  qb_list_for_each(iter, &cpg_pd_list_head) {
823  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
824 
826  cpd->initial_totem_conf_sent = 1;
827 
828  notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
829  }
830  }
831 
832  return CS_OK;
833 }
834 
835 static void downlist_log(const char *msg, struct req_exec_cpg_downlist *dl)
836 {
837  log_printf (LOG_DEBUG,
838  "%s: members(old:%d left:%d)",
839  msg,
840  dl->old_members,
841  dl->left_nodes);
842 }
843 
844 static void downlist_inform_clients (void)
845 {
846  struct qb_list_head *iter, *tmp_iter;
847  struct process_info *left_pi;
848  qb_map_t *group_map;
849  struct cpg_name cpg_group;
850  mar_cpg_name_t group;
851  struct confchg_data{
852  struct cpg_name cpg_group;
854  int left_list_entries;
855  struct qb_list_head list;
856  } *pcd;
857  qb_map_iter_t *miter;
858  int i, size;
859 
860  downlist_log("my downlist", &g_req_exec_cpg_downlist);
861 
862  group_map = qb_skiplist_create();
863 
864  /*
865  * only the cpg groups included in left nodes should receive
866  * confchg event, so we will collect these cpg groups and
867  * relative left_lists here.
868  */
869  qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
870  struct process_info *pi = qb_list_entry(iter, struct process_info, list);
871 
872  left_pi = NULL;
873  for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
874 
875  if (pi->nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
876  left_pi = pi;
877  break;
878  }
879  }
880 
881  if (left_pi) {
882  marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
883  cpg_group.value[cpg_group.length] = 0;
884 
885  pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
886  if (pcd == NULL) {
887  pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
888  memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
889  qb_map_put(group_map, pcd->cpg_group.value, pcd);
890  }
891  size = pcd->left_list_entries;
892  pcd->left_list[size].nodeid = left_pi->nodeid;
893  pcd->left_list[size].pid = left_pi->pid;
894  pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
895  pcd->left_list_entries++;
896  qb_list_del (&left_pi->list);
897  free (left_pi);
898  }
899  }
900 
901  /* send only one confchg event per cpg group */
902  miter = qb_map_iter_create(group_map);
903  while (qb_map_iter_next(miter, (void **)&pcd)) {
904  marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
905 
906  log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
907  for (i=0; i<pcd->left_list_entries; i++) {
908  log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
909  i, cpg_print_group_name(&group),
910  (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
911  pcd->left_list[i].pid);
912  }
913 
914  /* send confchg event */
915  notify_lib_joinlist(&group,
916  0, NULL,
917  pcd->left_list_entries,
918  pcd->left_list,
920 
921  free(pcd);
922  }
923  qb_map_iter_free(miter);
924  qb_map_destroy(group_map);
925 }
926 
927 /*
928  * Remove processes that might have left the group while we were suspended.
929  */
930 static void joinlist_remove_zombie_pi_entries (void)
931 {
932  struct qb_list_head *pi_iter, *tmp_iter;
933  struct qb_list_head *jl_iter;
934  struct process_info *pi;
935  struct joinlist_msg *stored_msg;
936  int found;
937 
938  qb_list_for_each_safe(pi_iter, tmp_iter, &process_info_list_head) {
939  pi = qb_list_entry (pi_iter, struct process_info, list);
940 
941  /*
942  * Ignore local node
943  */
944  if (pi->nodeid == api->totem_nodeid_get()) {
945  continue ;
946  }
947 
948  /*
949  * Try to find message in joinlist messages
950  */
951  found = 0;
952  qb_list_for_each(jl_iter, &joinlist_messages_head) {
953  stored_msg = qb_list_entry(jl_iter, struct joinlist_msg, list);
954 
955  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
956  continue ;
957  }
958 
959  if (pi->nodeid == stored_msg->sender_nodeid &&
960  pi->pid == stored_msg->pid &&
961  mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
962  found = 1;
963  break ;
964  }
965  }
966 
967  if (!found) {
968  do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
969  }
970  }
971 }
972 
973 static void joinlist_inform_clients (void)
974 {
975  struct joinlist_msg *stored_msg;
976  struct qb_list_head *iter;
977  unsigned int i;
978  qb_map_t *group_notify_map;
979  qb_map_iter_t *miter;
980  struct join_list_confchg_data *jld;
981 
982  group_notify_map = qb_skiplist_create();
983 
984  i = 0;
985  qb_list_for_each(iter, &joinlist_messages_head) {
986  stored_msg = qb_list_entry(iter, struct joinlist_msg, list);
987 
988  log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
989  i++, cpg_print_group_name(&stored_msg->group_name),
990  (char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
991  stored_msg->pid);
992 
993  /* Ignore our own messages */
994  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
995  continue ;
996  }
997 
998  do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
999  CONFCHG_CPG_REASON_NODEUP, group_notify_map);
1000  }
1001 
1002  miter = qb_map_iter_create(group_notify_map);
1003  while (qb_map_iter_next(miter, (void **)&jld)) {
1004  notify_lib_joinlist(&jld->cpg_group,
1005  jld->join_list_entries, jld->join_list,
1006  0, NULL,
1008  free(jld);
1009  }
1010  qb_map_iter_free(miter);
1011  qb_map_destroy(group_notify_map);
1012 
1013  joinlist_remove_zombie_pi_entries ();
1014 }
1015 
1016 static void joinlist_messages_delete (void)
1017 {
1018  struct joinlist_msg *stored_msg;
1019  struct qb_list_head *iter, *tmp_iter;
1020 
1021  qb_list_for_each_safe(iter, tmp_iter, &joinlist_messages_head) {
1022  stored_msg = qb_list_entry(iter, struct joinlist_msg, list);
1023  qb_list_del (&stored_msg->list);
1024  free (stored_msg);
1025  }
1026  qb_list_init (&joinlist_messages_head);
1027 }
1028 
1029 static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
1030 {
1031  qb_list_init (&joinlist_messages_head);
1032  api = corosync_api;
1033  return (NULL);
1034 }
1035 
1036 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
1037 {
1038  struct qb_list_head *iter, *tmp_iter;
1039  struct process_info *pi;
1040 
1041  qb_list_for_each_safe(iter, tmp_iter, &(cpg_iteration_instance->items_list_head)) {
1042  pi = qb_list_entry (iter, struct process_info, list);
1043  qb_list_del (&pi->list);
1044  free (pi);
1045  }
1046 
1047  qb_list_del (&cpg_iteration_instance->list);
1048  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
1049 }
1050 
1051 static void cpg_pd_finalize (struct cpg_pd *cpd)
1052 {
1053  struct qb_list_head *iter, *tmp_iter;
1054  struct cpg_iteration_instance *cpii;
1055 
1056  zcb_all_free(cpd);
1057  qb_list_for_each_safe(iter, tmp_iter, &(cpd->iteration_instance_list_head)) {
1058  cpii = qb_list_entry (iter, struct cpg_iteration_instance, list);
1059 
1060  cpg_iteration_instance_finalize (cpii);
1061  }
1062 
1063  qb_list_del (&cpd->list);
1064 }
1065 
1066 static int cpg_lib_exit_fn (void *conn)
1067 {
1068  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1069 
1070  log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
1071 
1072  if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
1073  cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
1075  }
1076 
1077  cpg_pd_finalize (cpd);
1078 
1079  api->ipc_refcnt_dec (conn);
1080  return (0);
1081 }
1082 
1083 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
1084 {
1086  struct iovec req_exec_cpg_iovec;
1087  int result;
1088 
1089  memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
1090  req_exec_cpg_procjoin.pid = pid;
1091  req_exec_cpg_procjoin.reason = reason;
1092 
1093  req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
1095 
1096  req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
1097  req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
1098 
1099  result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
1100 
1101  return (result);
1102 }
1103 
1104 /* Can byteswap join & leave messages */
1105 static void exec_cpg_procjoin_endian_convert (void *msg)
1106 {
1108 
1109  req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
1110  swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1111  req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
1112 }
1113 
1114 static void exec_cpg_joinlist_endian_convert (void *msg_v)
1115 {
1116  char *msg = msg_v;
1117  struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
1118  struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
1119 
1120  swab_mar_int32_t (&res->size);
1121 
1122  while ((const char*)jle < msg + res->size) {
1123  jle->pid = swab32(jle->pid);
1124  swab_mar_cpg_name_t (&jle->group_name);
1125  jle++;
1126  }
1127 }
1128 
1129 static void exec_cpg_downlist_endian_convert_old (void *msg)
1130 {
1131 }
1132 
1133 static void exec_cpg_downlist_endian_convert (void *msg)
1134 {
1136  unsigned int i;
1137 
1138  req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
1139  req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
1140 
1141  for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1142  req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
1143  }
1144 }
1145 
1146 
1147 static void exec_cpg_mcast_endian_convert (void *msg)
1148 {
1149  struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
1150 
1151  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1152  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1153  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1154  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1155  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1156 }
1157 
1158 static void exec_cpg_partial_mcast_endian_convert (void *msg)
1159 {
1161 
1162  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1163  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1164  req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
1165  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1166  req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
1167  req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
1168  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1169 }
1170 
1171 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
1172  struct qb_list_head *iter;
1173 
1174  qb_list_for_each(iter, &process_info_list_head) {
1175  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1176 
1177  if (pi->pid == pid && pi->nodeid == nodeid &&
1178  mar_name_compare (&pi->group, group_name) == 0) {
1179  return pi;
1180  }
1181  }
1182 
1183  return NULL;
1184 }
1185 
1186 static void do_proc_join(
1187  const mar_cpg_name_t *name,
1188  uint32_t pid,
1189  unsigned int nodeid,
1190  int reason,
1191  qb_map_t *group_notify_map)
1192 {
1193  struct process_info *pi;
1194  struct process_info *pi_entry;
1195  mar_cpg_address_t notify_info;
1196  struct qb_list_head *list;
1197  struct qb_list_head *list_to_add = NULL;
1198  int size;
1199 
1200  if (process_info_find (name, pid, nodeid) != NULL) {
1201  return ;
1202  }
1203  pi = malloc (sizeof (struct process_info));
1204  if (!pi) {
1205  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
1206  return;
1207  }
1208  pi->nodeid = nodeid;
1209  pi->pid = pid;
1210  memcpy(&pi->group, name, sizeof(*name));
1211  qb_list_init(&pi->list);
1212 
1213  /*
1214  * Insert new process in sorted order so synchronization works properly
1215  */
1216  list_to_add = &process_info_list_head;
1217  qb_list_for_each(list, &process_info_list_head) {
1218  pi_entry = qb_list_entry(list, struct process_info, list);
1219  if (pi_entry->nodeid > pi->nodeid ||
1220  (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
1221 
1222  break;
1223  }
1224  list_to_add = list;
1225  }
1226  qb_list_add (&pi->list, list_to_add);
1227 
1228  notify_info.pid = pi->pid;
1229  notify_info.nodeid = nodeid;
1230  notify_info.reason = reason;
1231 
1232  if (group_notify_map == NULL) {
1233  notify_lib_joinlist(&pi->group,
1234  1, &notify_info,
1235  0, NULL,
1237  } else {
1238  struct join_list_confchg_data *jld = qb_map_get(group_notify_map, pi->group.value);
1239  if (jld == NULL) {
1240  jld = (struct join_list_confchg_data *)calloc(1, sizeof(struct join_list_confchg_data));
1241  memcpy(&jld->cpg_group, &pi->group, sizeof(mar_cpg_name_t));
1242  qb_map_put(group_notify_map, jld->cpg_group.value, jld);
1243  }
1244  size = jld->join_list_entries;
1245  jld->join_list[size].nodeid = notify_info.nodeid;
1246  jld->join_list[size].pid = notify_info.pid;
1247  jld->join_list[size].reason = notify_info.reason;
1248  jld->join_list_entries++;
1249  }
1250 }
1251 
1252 static void do_proc_leave(
1253  const mar_cpg_name_t *name,
1254  uint32_t pid,
1255  unsigned int nodeid,
1256  int reason)
1257 {
1258  struct process_info *pi;
1259  struct qb_list_head *iter, *tmp_iter;
1260  mar_cpg_address_t notify_info;
1261 
1262  notify_info.pid = pid;
1263  notify_info.nodeid = nodeid;
1264  notify_info.reason = reason;
1265 
1266  notify_lib_joinlist(name,
1267  0, NULL,
1268  1, &notify_info,
1270 
1271  qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
1272  pi = qb_list_entry(iter, struct process_info, list);
1273 
1274  if (pi->pid == pid && pi->nodeid == nodeid &&
1275  mar_name_compare (&pi->group, name)==0) {
1276  qb_list_del (&pi->list);
1277  free (pi);
1278  }
1279  }
1280 }
1281 
1282 static void message_handler_req_exec_cpg_downlist_old (
1283  const void *message,
1284  unsigned int nodeid)
1285 {
1286  log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node 0x%x",
1287  nodeid);
1288 }
1289 
1290 static void message_handler_req_exec_cpg_downlist(
1291  const void *message,
1292  unsigned int nodeid)
1293 {
1294  const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1295 
1296  log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received",
1297  req_exec_cpg_downlist->left_nodes);
1298 }
1299 
1300 
1301 static void message_handler_req_exec_cpg_procjoin (
1302  const void *message,
1303  unsigned int nodeid)
1304 {
1305  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1306 
1307  log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node 0x%x (%s) for pid %u",
1308  nodeid,
1309  api->totem_ifaces_print(nodeid),
1310  (unsigned int)req_exec_cpg_procjoin->pid);
1311 
1312  do_proc_join (&req_exec_cpg_procjoin->group_name,
1313  req_exec_cpg_procjoin->pid, nodeid,
1314  CONFCHG_CPG_REASON_JOIN, NULL);
1315 }
1316 
1317 static void message_handler_req_exec_cpg_procleave (
1318  const void *message,
1319  unsigned int nodeid)
1320 {
1321  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1322 
1323  log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node 0x%x (%s) for pid %u",
1324  nodeid,
1325  api->totem_ifaces_print(nodeid),
1326  (unsigned int)req_exec_cpg_procjoin->pid);
1327 
1328  do_proc_leave (&req_exec_cpg_procjoin->group_name,
1329  req_exec_cpg_procjoin->pid, nodeid,
1330  req_exec_cpg_procjoin->reason);
1331 }
1332 
1333 
1334 /* Got a proclist from another node */
1335 static void message_handler_req_exec_cpg_joinlist (
1336  const void *message_v,
1337  unsigned int nodeid)
1338 {
1339  const char *message = message_v;
1340  const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
1341  const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
1342  struct joinlist_msg *stored_msg;
1343 
1344  log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node 0x%x",
1345  nodeid);
1346 
1347  while ((const char*)jle < message + res->size) {
1348  stored_msg = malloc (sizeof (struct joinlist_msg));
1349  memset(stored_msg, 0, sizeof (struct joinlist_msg));
1350  stored_msg->sender_nodeid = nodeid;
1351  stored_msg->pid = jle->pid;
1352  memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
1353  qb_list_init (&stored_msg->list);
1354  qb_list_add (&stored_msg->list, &joinlist_messages_head);
1355  jle++;
1356  }
1357 }
1358 
1359 static void message_handler_req_exec_cpg_mcast (
1360  const void *message,
1361  unsigned int nodeid)
1362 {
1363  const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
1365  int msglen = req_exec_cpg_mcast->msglen;
1366  struct qb_list_head *iter, *pi_iter, *tmp_iter;
1367  struct cpg_pd *cpd;
1368  struct iovec iovec[2];
1369  int known_node = 0;
1370 
1372  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1373  res_lib_cpg_mcast.msglen = msglen;
1374  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1375  res_lib_cpg_mcast.nodeid = nodeid;
1376 
1377  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1378  sizeof(mar_cpg_name_t));
1379  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1380  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1381 
1382  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1383  iovec[1].iov_len = msglen;
1384 
1385  qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1386  cpd = qb_list_entry(iter, struct cpg_pd, list);
1388  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1389 
1390  if (!known_node) {
1391  /* Try to find, if we know the node */
1392  qb_list_for_each(pi_iter, &process_info_list_head) {
1393  struct process_info *pi = qb_list_entry (pi_iter, struct process_info, list);
1394 
1395  if (pi->nodeid == nodeid &&
1396  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1397  known_node = 1;
1398  break;
1399  }
1400  }
1401  }
1402 
1403  if (!known_node) {
1404  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1405  return ;
1406  }
1407 
1408  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1409  }
1410  }
1411 }
1412 
1413 static void message_handler_req_exec_cpg_partial_mcast (
1414  const void *message,
1415  unsigned int nodeid)
1416 {
1417  const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
1419  int msglen = req_exec_cpg_mcast->fraglen;
1420  struct qb_list_head *iter, *pi_iter, *tmp_iter;
1421  struct cpg_pd *cpd;
1422  struct iovec iovec[2];
1423  int known_node = 0;
1424 
1425  log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
1426 
1428  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1429  res_lib_cpg_mcast.fraglen = msglen;
1430  res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
1431  res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
1432  res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
1433  res_lib_cpg_mcast.nodeid = nodeid;
1434 
1435  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1436  sizeof(mar_cpg_name_t));
1437  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1438  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1439 
1440  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1441  iovec[1].iov_len = msglen;
1442 
1443  qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1444  cpd = qb_list_entry(iter, struct cpg_pd, list);
1445 
1447  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1448 
1449  if (!known_node) {
1450  /* Try to find, if we know the node */
1451  qb_list_for_each(pi_iter, &process_info_list_head) {
1452  struct process_info *pi = qb_list_entry (pi_iter, struct process_info, list);
1453 
1454  if (pi->nodeid == nodeid &&
1455  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1456  known_node = 1;
1457  break;
1458  }
1459  }
1460  }
1461 
1462  if (!known_node) {
1463  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1464  return ;
1465  }
1466 
1467  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1468  }
1469  }
1470 }
1471 
1472 
1473 static int cpg_exec_send_downlist(void)
1474 {
1475  struct iovec iov;
1476 
1477  g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
1478  g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
1479 
1480  g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1481 
1482  iov.iov_base = (void *)&g_req_exec_cpg_downlist;
1483  iov.iov_len = g_req_exec_cpg_downlist.header.size;
1484 
1485  return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
1486 }
1487 
1488 static int cpg_exec_send_joinlist(void)
1489 {
1490  int count = 0;
1491  struct qb_list_head *iter;
1492  struct qb_ipc_response_header *res;
1493  char *buf;
1494  struct join_list_entry *jle;
1495  struct iovec req_exec_cpg_iovec;
1496 
1497  qb_list_for_each(iter, &process_info_list_head) {
1498  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1499 
1500  if (pi->nodeid == api->totem_nodeid_get ()) {
1501  count++;
1502  }
1503  }
1504 
1505  /* Nothing to send */
1506  if (!count)
1507  return 0;
1508 
1509  buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
1510  if (!buf) {
1511  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
1512  return -1;
1513  }
1514 
1515  jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
1516  res = (struct qb_ipc_response_header *)buf;
1517 
1518  qb_list_for_each(iter, &process_info_list_head) {
1519  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1520 
1521  if (pi->nodeid == api->totem_nodeid_get ()) {
1522  memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
1523  jle->pid = pi->pid;
1524  jle++;
1525  }
1526  }
1527 
1529  res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
1530 
1531  req_exec_cpg_iovec.iov_base = buf;
1532  req_exec_cpg_iovec.iov_len = res->size;
1533 
1534  return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
1535 }
1536 
1537 static int cpg_lib_init_fn (void *conn)
1538 {
1539  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1540  memset (cpd, 0, sizeof(struct cpg_pd));
1541  cpd->conn = conn;
1542  qb_list_add (&cpd->list, &cpg_pd_list_head);
1543 
1544  qb_list_init (&cpd->iteration_instance_list_head);
1545  qb_list_init (&cpd->zcb_mapped_list_head);
1546 
1547  api->ipc_refcnt_inc (conn);
1548  log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
1549  return (0);
1550 }
1551 
1552 /* Join message from the library */
1553 static void message_handler_req_lib_cpg_join (void *conn, const void *message)
1554 {
1555  const struct req_lib_cpg_join *req_lib_cpg_join = message;
1556  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1558  cs_error_t error = CS_OK;
1559  struct qb_list_head *iter;
1560 
1561  /* Test, if we don't have same pid and group name joined */
1562  qb_list_for_each(iter, &cpg_pd_list_head) {
1563  struct cpg_pd *cpd_item = qb_list_entry (iter, struct cpg_pd, list);
1564 
1565  if (cpd_item->pid == req_lib_cpg_join->pid &&
1566  mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
1567 
1568  /* We have same pid and group name joined -> return error */
1569  error = CS_ERR_EXIST;
1570  goto response_send;
1571  }
1572  }
1573 
1574  /*
1575  * Same check must be done in process info list, because there may be not yet delivered
1576  * leave of client.
1577  */
1578  qb_list_for_each(iter, &process_info_list_head) {
1579  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1580 
1581  if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
1582  mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
1583  /* We have same pid and group name joined -> return error */
1584  error = CS_ERR_TRY_AGAIN;
1585  goto response_send;
1586  }
1587  }
1588 
1589  if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
1590  error = CS_ERR_NAME_TOO_LONG;
1591  goto response_send;
1592  }
1593 
1594  switch (cpd->cpd_state) {
1595  case CPD_STATE_UNJOINED:
1596  error = CS_OK;
1598  cpd->pid = req_lib_cpg_join->pid;
1599  cpd->flags = req_lib_cpg_join->flags;
1600  memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
1601  sizeof (cpd->group_name));
1602 
1603  cpg_node_joinleave_send (req_lib_cpg_join->pid,
1604  &req_lib_cpg_join->group_name,
1606  break;
1608  error = CS_ERR_BUSY;
1609  break;
1611  error = CS_ERR_EXIST;
1612  break;
1614  error = CS_ERR_EXIST;
1615  break;
1616  }
1617 
1618 response_send:
1619  res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1621  res_lib_cpg_join.header.error = error;
1622  api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1623 }
1624 
1625 /* Leave message from the library */
1626 static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
1627 {
1629  cs_error_t error = CS_OK;
1630  struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
1631  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1632 
1633  log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
1634 
1635  switch (cpd->cpd_state) {
1636  case CPD_STATE_UNJOINED:
1637  error = CS_ERR_NOT_EXIST;
1638  break;
1640  error = CS_ERR_NOT_EXIST;
1641  break;
1643  error = CS_ERR_BUSY;
1644  break;
1646  error = CS_OK;
1648  cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1649  &req_lib_cpg_leave->group_name,
1652  break;
1653  }
1654 
1655  /* send return */
1656  res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1658  res_lib_cpg_leave.header.error = error;
1660 }
1661 
1662 /* Finalize message from library */
1663 static void message_handler_req_lib_cpg_finalize (
1664  void *conn,
1665  const void *message)
1666 {
1667  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1669  cs_error_t error = CS_OK;
1670 
1671  log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
1672 
1673  /*
1674  * We will just remove cpd from list. After this call, connection will be
1675  * closed on lib side, and cpg_lib_exit_fn will be called
1676  */
1677  qb_list_del (&cpd->list);
1678  qb_list_init (&cpd->list);
1679 
1680  res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
1682  res_lib_cpg_finalize.header.error = error;
1683 
1685  sizeof (res_lib_cpg_finalize));
1686 }
1687 
1688 static int
1689 memory_map (
1690  const char *path,
1691  size_t bytes,
1692  void **buf)
1693 {
1694  int32_t fd;
1695  void *addr;
1696  int32_t res;
1697 
1698  fd = open (path, O_RDWR, 0600);
1699 
1700  unlink (path);
1701 
1702  if (fd == -1) {
1703  return (-1);
1704  }
1705 
1706  res = ftruncate (fd, bytes);
1707  if (res == -1) {
1708  goto error_close_unlink;
1709  }
1710 
1711  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1712  MAP_SHARED, fd, 0);
1713 
1714  if (addr == MAP_FAILED) {
1715  goto error_close_unlink;
1716  }
1717 #ifdef MADV_NOSYNC
1718  madvise(addr, bytes, MADV_NOSYNC);
1719 #endif
1720 
1721  res = close (fd);
1722  if (res) {
1723  munmap (addr, bytes);
1724  return (-1);
1725  }
1726  *buf = addr;
1727  return (0);
1728 
1729 error_close_unlink:
1730  close (fd);
1731  unlink(path);
1732  return -1;
1733 }
1734 
1735 static inline int zcb_alloc (
1736  struct cpg_pd *cpd,
1737  const char *path_to_file,
1738  size_t size,
1739  void **addr)
1740 {
1741  struct zcb_mapped *zcb_mapped;
1742  unsigned int res;
1743 
1744  zcb_mapped = malloc (sizeof (struct zcb_mapped));
1745  if (zcb_mapped == NULL) {
1746  return (-1);
1747  }
1748 
1749  res = memory_map (
1750  path_to_file,
1751  size,
1752  addr);
1753  if (res == -1) {
1754  free (zcb_mapped);
1755  return (-1);
1756  }
1757 
1758  qb_list_init (&zcb_mapped->list);
1759  zcb_mapped->addr = *addr;
1760  zcb_mapped->size = size;
1761  qb_list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
1762  return (0);
1763 }
1764 
1765 
1766 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
1767 {
1768  unsigned int res;
1769 
1770  res = munmap (zcb_mapped->addr, zcb_mapped->size);
1771  qb_list_del (&zcb_mapped->list);
1772  free (zcb_mapped);
1773  return (res);
1774 }
1775 
1776 static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
1777 {
1778  struct qb_list_head *list, *tmp_iter;
1779  struct zcb_mapped *zcb_mapped;
1780  unsigned int res = 0;
1781 
1782  qb_list_for_each_safe(list, tmp_iter, &(cpd->zcb_mapped_list_head)) {
1783  zcb_mapped = qb_list_entry (list, struct zcb_mapped, list);
1784 
1785  if (zcb_mapped->addr == addr) {
1786  res = zcb_free (zcb_mapped);
1787  break;
1788  }
1789 
1790  }
1791  return (res);
1792 }
1793 
1794 static inline int zcb_all_free (
1795  struct cpg_pd *cpd)
1796 {
1797  struct qb_list_head *list, *tmp_iter;
1798  struct zcb_mapped *zcb_mapped;
1799 
1800  qb_list_for_each_safe(list, tmp_iter, &(cpd->zcb_mapped_list_head)) {
1801  zcb_mapped = qb_list_entry (list, struct zcb_mapped, list);
1802 
1803  zcb_free (zcb_mapped);
1804  }
1805  return (0);
1806 }
1807 
1808 union u {
1809  uint64_t server_addr;
1810  void *server_ptr;
1811 };
1812 
1813 static uint64_t void2serveraddr (void *server_ptr)
1814 {
1815  union u u;
1816 
1817  u.server_ptr = server_ptr;
1818  return (u.server_addr);
1819 }
1820 
1821 static void *serveraddr2void (uint64_t server_addr)
1822 {
1823  union u u;
1824 
1826  return (u.server_ptr);
1827 };
1828 
1829 static void message_handler_req_lib_cpg_zc_alloc (
1830  void *conn,
1831  const void *message)
1832 {
1834  struct qb_ipc_response_header res_header;
1835  void *addr = NULL;
1836  struct coroipcs_zc_header *zc_header;
1837  unsigned int res;
1838  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1839 
1840  log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
1841 
1842  res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1843  &addr);
1844  assert(res == 0);
1845 
1846  zc_header = (struct coroipcs_zc_header *)addr;
1847  zc_header->server_address = void2serveraddr(addr);
1848 
1849  res_header.size = sizeof (struct qb_ipc_response_header);
1850  res_header.id = 0;
1851  api->ipc_response_send (conn,
1852  &res_header,
1853  res_header.size);
1854 }
1855 
1856 static void message_handler_req_lib_cpg_zc_free (
1857  void *conn,
1858  const void *message)
1859 {
1861  struct qb_ipc_response_header res_header;
1862  void *addr = NULL;
1863  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1864 
1865  log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
1866 
1867  addr = serveraddr2void (hdr->server_address);
1868 
1869  zcb_by_addr_free (cpd, addr);
1870 
1871  res_header.size = sizeof (struct qb_ipc_response_header);
1872  res_header.id = 0;
1873  api->ipc_response_send (
1874  conn, &res_header,
1875  res_header.size);
1876 }
1877 
1878 /* Fragmented mcast message from the library */
1879 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
1880 {
1881  const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
1882  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1883  mar_cpg_name_t group_name = cpd->group_name;
1884 
1885  struct iovec req_exec_cpg_iovec[2];
1886  struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
1887  struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1888  int msglen = req_lib_cpg_mcast->fraglen;
1889  int result;
1890  cs_error_t error = CS_ERR_NOT_EXIST;
1891 
1892  log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
1893  log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
1894 
1895  switch (cpd->cpd_state) {
1896  case CPD_STATE_UNJOINED:
1897  error = CS_ERR_NOT_EXIST;
1898  break;
1900  error = CS_ERR_NOT_EXIST;
1901  break;
1903  error = CS_OK;
1904  break;
1906  error = CS_OK;
1907  break;
1908  }
1909 
1910  res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
1911  res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
1912 
1913  if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
1915  }
1917  error = CS_ERR_INTERRUPT;
1918  }
1919 
1920  if (error == CS_OK) {
1921  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1922  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1924  req_exec_cpg_mcast.pid = cpd->pid;
1925  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1926  req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
1927  req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
1928  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1929  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1930  sizeof(mar_cpg_name_t));
1931 
1932  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1933  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1934  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1935  req_exec_cpg_iovec[1].iov_len = msglen;
1936 
1937  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1938  assert(result == 0);
1939  } else {
1940  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
1941  conn, group_name.value, cpd->cpd_state, error);
1942  }
1943 
1944  res_lib_cpg_partial_send.header.error = error;
1945  api->ipc_response_send (conn, &res_lib_cpg_partial_send,
1946  sizeof (res_lib_cpg_partial_send));
1947 }
1948 
1949 /* Mcast message from the library */
1950 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
1951 {
1952  const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
1953  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1954  mar_cpg_name_t group_name = cpd->group_name;
1955 
1956  struct iovec req_exec_cpg_iovec[2];
1957  struct req_exec_cpg_mcast req_exec_cpg_mcast;
1958  int msglen = req_lib_cpg_mcast->msglen;
1959  int result;
1960  cs_error_t error = CS_ERR_NOT_EXIST;
1961 
1962  log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
1963 
1964  switch (cpd->cpd_state) {
1965  case CPD_STATE_UNJOINED:
1966  error = CS_ERR_NOT_EXIST;
1967  break;
1969  error = CS_ERR_NOT_EXIST;
1970  break;
1972  error = CS_OK;
1973  break;
1975  error = CS_OK;
1976  break;
1977  }
1978 
1979  if (error == CS_OK) {
1980  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1981  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1983  req_exec_cpg_mcast.pid = cpd->pid;
1984  req_exec_cpg_mcast.msglen = msglen;
1985  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1986  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1987  sizeof(mar_cpg_name_t));
1988 
1989  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1990  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1991  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1992  req_exec_cpg_iovec[1].iov_len = msglen;
1993 
1994  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1995  assert(result == 0);
1996  } else {
1997  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
1998  conn, group_name.value, cpd->cpd_state, error);
1999  }
2000 }
2001 
2002 static void message_handler_req_lib_cpg_zc_execute (
2003  void *conn,
2004  const void *message)
2005 {
2007  struct qb_ipc_request_header *header;
2008  struct res_lib_cpg_mcast res_lib_cpg_mcast;
2009  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2010  struct iovec req_exec_cpg_iovec[2];
2011  struct req_exec_cpg_mcast req_exec_cpg_mcast;
2013  int result;
2014  cs_error_t error = CS_ERR_NOT_EXIST;
2015 
2016  log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
2017 
2018  header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
2019  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)header;
2020 
2021  switch (cpd->cpd_state) {
2022  case CPD_STATE_UNJOINED:
2023  error = CS_ERR_NOT_EXIST;
2024  break;
2026  error = CS_ERR_NOT_EXIST;
2027  break;
2029  error = CS_OK;
2030  break;
2032  error = CS_OK;
2033  break;
2034  }
2035 
2036  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
2037  res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
2038  if (error == CS_OK) {
2039  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2040  req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
2042  req_exec_cpg_mcast.pid = cpd->pid;
2043  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2044  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2045  memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
2046  sizeof(mar_cpg_name_t));
2047 
2048  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2049  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2050  req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
2051  req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2052 
2053  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2054  if (result == 0) {
2055  res_lib_cpg_mcast.header.error = CS_OK;
2056  } else {
2057  res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
2058  }
2059  } else {
2060  res_lib_cpg_mcast.header.error = error;
2061  }
2062 
2063  api->ipc_response_send (conn, &res_lib_cpg_mcast,
2064  sizeof (res_lib_cpg_mcast));
2065 
2066 }
2067 
2068 static void message_handler_req_lib_cpg_membership (void *conn,
2069  const void *message)
2070 {
2072  (struct req_lib_cpg_membership_get *)message;
2073  struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
2074  struct qb_list_head *iter;
2075  int member_count = 0;
2076 
2077  res_lib_cpg_membership_get.header.id = MESSAGE_RES_CPG_MEMBERSHIP;
2078  res_lib_cpg_membership_get.header.error = CS_OK;
2079  res_lib_cpg_membership_get.header.size =
2080  sizeof (struct res_lib_cpg_membership_get);
2081 
2082  qb_list_for_each(iter, &process_info_list_head) {
2083  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
2084  if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
2085  res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
2086  res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
2087  member_count += 1;
2088  }
2089  }
2090  res_lib_cpg_membership_get.member_count = member_count;
2091 
2092  api->ipc_response_send (conn, &res_lib_cpg_membership_get,
2093  sizeof (res_lib_cpg_membership_get));
2094 }
2095 
2096 static void message_handler_req_lib_cpg_local_get (void *conn,
2097  const void *message)
2098 {
2099  struct res_lib_cpg_local_get res_lib_cpg_local_get;
2100 
2101  res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
2102  res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
2103  res_lib_cpg_local_get.header.error = CS_OK;
2104  res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
2105 
2106  api->ipc_response_send (conn, &res_lib_cpg_local_get,
2107  sizeof (res_lib_cpg_local_get));
2108 }
2109 
2110 static void message_handler_req_lib_cpg_iteration_initialize (
2111  void *conn,
2112  const void *message)
2113 {
2115  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2116  hdb_handle_t cpg_iteration_handle = 0;
2117  struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
2118  struct qb_list_head *iter, *iter2;
2119  struct cpg_iteration_instance *cpg_iteration_instance;
2120  cs_error_t error = CS_OK;
2121  int res;
2122 
2123  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
2124 
2125  /* Because between calling this function and *next can be some operations which will
2126  * change list, we must do full copy.
2127  */
2128 
2129  /*
2130  * Create new iteration instance
2131  */
2132  res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
2133  &cpg_iteration_handle);
2134 
2135  if (res != 0) {
2136  error = CS_ERR_NO_MEMORY;
2137  goto response_send;
2138  }
2139 
2140  res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
2141 
2142  if (res != 0) {
2143  error = CS_ERR_BAD_HANDLE;
2144  goto error_destroy;
2145  }
2146 
2147  qb_list_init (&cpg_iteration_instance->items_list_head);
2148  cpg_iteration_instance->handle = cpg_iteration_handle;
2149 
2150  /*
2151  * Create copy of process_info list "grouped by" group name
2152  */
2153  qb_list_for_each(iter, &process_info_list_head) {
2154  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
2155  struct process_info *new_pi;
2156 
2157  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2158  /*
2159  * Try to find processed group name in our list new list
2160  */
2161  int found = 0;
2162 
2163  qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2164  struct process_info *pi2 = qb_list_entry (iter2, struct process_info, list);
2165 
2166  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2167  found = 1;
2168  break;
2169  }
2170  }
2171 
2172  if (found) {
2173  /*
2174  * We have this name in list -> don't add
2175  */
2176  continue ;
2177  }
2178  } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
2179  /*
2180  * Test pi group name with request
2181  */
2182  if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2183  /*
2184  * Not same -> don't add
2185  */
2186  continue ;
2187  }
2188 
2189  new_pi = malloc (sizeof (struct process_info));
2190  if (!new_pi) {
2191  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
2192 
2193  error = CS_ERR_NO_MEMORY;
2194 
2195  goto error_put_destroy;
2196  }
2197 
2198  memcpy (new_pi, pi, sizeof (struct process_info));
2199  qb_list_init (&new_pi->list);
2200 
2201  if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_NAME_ONLY) {
2202  /*
2203  * pid and nodeid -> undefined
2204  */
2205  new_pi->pid = new_pi->nodeid = 0;
2206  }
2207 
2208  /*
2209  * We will return list "grouped" by "group name", so try to find right place to add
2210  */
2211  qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2212  struct process_info *pi2 = qb_list_entry (iter2, struct process_info, list);
2213 
2214  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2215  break;
2216  }
2217  }
2218 
2219  qb_list_add (&new_pi->list, iter2);
2220  }
2221 
2222  /*
2223  * Now we have a full "grouped by" copy of process_info list
2224  */
2225 
2226  /*
2227  * Add instance to current cpd list
2228  */
2229  qb_list_init (&cpg_iteration_instance->list);
2230  qb_list_add (&cpg_iteration_instance->list, &cpd->iteration_instance_list_head);
2231 
2232  cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2233 
2234 error_put_destroy:
2235  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2236 error_destroy:
2237  if (error != CS_OK) {
2238  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2239  }
2240 
2241 response_send:
2242  res_lib_cpg_iterationinitialize.header.size = sizeof (res_lib_cpg_iterationinitialize);
2243  res_lib_cpg_iterationinitialize.header.id = MESSAGE_RES_CPG_ITERATIONINITIALIZE;
2244  res_lib_cpg_iterationinitialize.header.error = error;
2245  res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2246 
2247  api->ipc_response_send (conn, &res_lib_cpg_iterationinitialize,
2248  sizeof (res_lib_cpg_iterationinitialize));
2249 }
2250 
2251 static void message_handler_req_lib_cpg_iteration_next (
2252  void *conn,
2253  const void *message)
2254 {
2255  const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
2256  struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
2257  struct cpg_iteration_instance *cpg_iteration_instance;
2258  cs_error_t error = CS_OK;
2259  int res;
2260  struct process_info *pi;
2261 
2262  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
2263 
2264  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2265  req_lib_cpg_iterationnext->iteration_handle,
2266  (void *)&cpg_iteration_instance);
2267 
2268  if (res != 0) {
2269  error = CS_ERR_LIBRARY;
2270  goto error_exit;
2271  }
2272 
2273  assert (cpg_iteration_instance);
2274 
2275  cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2276 
2277  if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2278  error = CS_ERR_NO_SECTIONS;
2279  goto error_put;
2280  }
2281 
2282  pi = qb_list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
2283 
2284  /*
2285  * Copy iteration data
2286  */
2287  res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
2288  res_lib_cpg_iterationnext.description.pid = pi->pid;
2289  memcpy (&res_lib_cpg_iterationnext.description.group,
2290  &pi->group,
2291  sizeof (mar_cpg_name_t));
2292 
2293 error_put:
2294  hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2295 error_exit:
2296  res_lib_cpg_iterationnext.header.size = sizeof (res_lib_cpg_iterationnext);
2297  res_lib_cpg_iterationnext.header.id = MESSAGE_RES_CPG_ITERATIONNEXT;
2298  res_lib_cpg_iterationnext.header.error = error;
2299 
2300  api->ipc_response_send (conn, &res_lib_cpg_iterationnext,
2301  sizeof (res_lib_cpg_iterationnext));
2302 }
2303 
2304 static void message_handler_req_lib_cpg_iteration_finalize (
2305  void *conn,
2306  const void *message)
2307 {
2309  struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
2310  struct cpg_iteration_instance *cpg_iteration_instance;
2311  cs_error_t error = CS_OK;
2312  int res;
2313 
2314  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
2315 
2316  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2317  req_lib_cpg_iterationfinalize->iteration_handle,
2318  (void *)&cpg_iteration_instance);
2319 
2320  if (res != 0) {
2321  error = CS_ERR_LIBRARY;
2322  goto error_exit;
2323  }
2324 
2325  assert (cpg_iteration_instance);
2326 
2327  cpg_iteration_instance_finalize (cpg_iteration_instance);
2328  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2329 
2330 error_exit:
2331  res_lib_cpg_iterationfinalize.header.size = sizeof (res_lib_cpg_iterationfinalize);
2332  res_lib_cpg_iterationfinalize.header.id = MESSAGE_RES_CPG_ITERATIONFINALIZE;
2333  res_lib_cpg_iterationfinalize.header.error = error;
2334 
2335  api->ipc_response_send (conn, &res_lib_cpg_iterationfinalize,
2336  sizeof (res_lib_cpg_iterationfinalize));
2337 }
void *(* ipc_private_data_get)(void *conn)
Definition: coroapi.h:256
#define TOTEM_AGREED
Definition: coroapi.h:102
int initial_totem_conf_sent
Definition: exec/cpg.c:151
const char * name
Definition: coroapi.h:491
Definition: exec/cpg.c:1808
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:116
mar_cpg_address_t join_list[CPG_MEMBERS_MAX]
Definition: exec/cpg.c:199
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
uint64_t initial_transition_counter
Definition: exec/cpg.c:153
#define LOGSYS_LEVEL_TRACE
Definition: logsys.h:77
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:502
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:193
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
The cpg_name struct.
Definition: cpg.h:120
The corosync_service_engine struct.
Definition: coroapi.h:490
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
Definition: coroapi.h:265
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:258
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
Definition: exec/cpg.c:454
cpg_sync_state
Definition: exec/cpg.c:138
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
mar_cpg_name_t group
Definition: exec/cpg.c:187
The corosync_lib_handler struct.
Definition: coroapi.h:467
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
hdb_handle_t handle
Definition: exec/cpg.c:160
uint32_t pid
Definition: exec/cpg.c:186
#define CPG_MEMBERS_MAX
Definition: cpg.h:125
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The corosync_exec_handler struct.
Definition: coroapi.h:475
struct qb_list_head list
Definition: exec/cpg.c:505
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
Definition: coroapi.h:279
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
uint64_t transition_counter
Definition: exec/cpg.c:152
#define log_printf(level, format, args...)
Definition: logsys.h:323
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
Definition: coroapi.h:476
uint64_t server_address
Definition: ipc_cpg.h:500
void * conn
Definition: exec/cpg.c:146
#define SERVICE_ID_MAKE(a, b)
Definition: coroapi.h:458
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
struct qb_list_head list
Definition: exec/cpg.c:90
#define LOGSYS_LEVEL_WARNING
Definition: logsys.h:73
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
unsigned int flags
Definition: exec/cpg.c:150
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:275
uint32_t pid
Definition: exec/cpg.c:503
void(* ipc_refcnt_dec)(void *conn)
Definition: coroapi.h:270
struct qb_list_head * current_pointer
Definition: exec/cpg.c:163
struct qb_list_head list
Definition: exec/cpg.c:161
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
#define LOGSYS_LEVEL_ERROR
Definition: logsys.h:72
size_t size
Definition: exec/cpg.c:92
mar_cpg_name_t struct
Definition: ipc_cpg.h:112
void * server_ptr
Definition: exec/cpg.c:1810
uint32_t pid
Definition: exec/cpg.c:148
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:94
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
mar_cpg_name_t group_name
Definition: exec/cpg.c:194
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
uint8_t mar_uint8_t
Definition: mar_gen.h:51
struct qb_list_head list
Definition: exec/cpg.c:154
mar_cpg_name_t group_name
Definition: exec/cpg.c:147
The corosync_api_v1 struct.
Definition: coroapi.h:225
typedef __attribute__
cpg_message_req_types
Definition: exec/cpg.c:79
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
struct totem_message_header header
Definition: totemsrp.c:260
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
enum cpd_state cpd_state
Definition: exec/cpg.c:149
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
uint32_t mar_uint32_t
Definition: mar_gen.h:53
unsigned int nodeid
Definition: exec/cpg.c:185
struct qb_list_head items_list_head
Definition: exec/cpg.c:162
struct qb_list_head zcb_mapped_list_head
Definition: exec/cpg.c:156
struct qb_list_head list
Definition: exec/cpg.c:188
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
Definition: exec/cpg.c:192
qb_handle_t hdb_handle_t
Definition: hdb.h:52
The memb_ring_id struct.
Definition: coroapi.h:122
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
struct corosync_service_engine cpg_service_engine
Definition: exec/cpg.c:433
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
unsigned int nodeid
Definition: coroapi.h:123
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
const char *(* totem_ifaces_print)(unsigned int nodeid)
Definition: coroapi.h:290
uint32_t pid
Definition: exec/cpg.c:193
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
void * addr
Definition: exec/cpg.c:91
mar_cpg_name_t group_name
Definition: exec/cpg.c:504
unsigned long long seq
Definition: coroapi.h:124
mar_cpg_name_t cpg_group
Definition: exec/cpg.c:198
struct qb_list_head iteration_instance_list_head
Definition: exec/cpg.c:155
void(* lib_handler_fn)(void *conn, const void *msg)
Definition: coroapi.h:468
char type
Definition: totem.h:55
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
QB_LIST_DECLARE(cpg_pd_list_head)
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:263
uint64_t server_addr
Definition: exec/cpg.c:1809
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
unsigned int nodeid
Definition: coroapi.h:75
struct memb_ring_id ring_id
Definition: totemsrp.c:264
mar_cpg_ring_id_t struct
Definition: ipc_cpg.h:230
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
Definition: coroapi.h:252
cpd_state
Definition: exec/cpg.c:131
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
Message from another node.
Definition: ipc_cpg.h:333
The mar_message_source_t struct.
Definition: coroapi.h:50
void(* ipc_refcnt_inc)(void *conn)
Definition: coroapi.h:268