pacemaker  1.1.14-70404b0
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2019 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82 #if 0
83  /* Should not be necessary */
85  get_ais_details(&local_nodeid, NULL);
86  goto done;
87  }
88 #endif
89 
90  if(handle == 0) {
91  crm_trace("Creating connection");
92  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
93  if (rc != CS_OK) {
94  crm_err("Could not connect to the CPG API: %s (%d)",
95  cs_strerror(rc), rc);
96  return 0;
97  }
98 
99  rc = cpg_fd_get(local_handle, &fd);
100  if (rc != CS_OK) {
101  crm_err("Could not obtain the CPG API connection: %s (%d)",
102  cs_strerror(rc), rc);
103  goto bail;
104  }
105 
106  /* CPG provider run as root (in given user namespace, anyway)? */
107  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
108  &found_uid, &found_gid))) {
109  crm_err("CPG provider is not authentic:"
110  " process %lld (uid: %lld, gid: %lld)",
111  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
112  (long long) found_uid, (long long) found_gid);
113  goto bail;
114  } else if (rv < 0) {
115  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
116  strerror(-rv), -rv);
117  goto bail;
118  }
119  }
120 
121  if (rc == CS_OK) {
122  retries = 0;
123  crm_trace("Performing lookup");
124  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
125  }
126 
127  if (rc != CS_OK) {
128  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
129  }
130 
131 bail:
132  if(handle == 0) {
133  crm_trace("Closing connection");
134  cpg_finalize(local_handle);
135  }
136  crm_debug("Local nodeid is %u", local_nodeid);
137  return local_nodeid;
138 }
139 
140 
143 
144 static ssize_t crm_cs_flush(gpointer data);
145 
146 static gboolean
147 crm_cs_flush_cb(gpointer data)
148 {
149  cs_message_timer = 0;
150  crm_cs_flush(data);
151  return FALSE;
152 }
153 
154 #define CS_SEND_MAX 200
155 static ssize_t
156 crm_cs_flush(gpointer data)
157 {
158  int sent = 0;
159  ssize_t rc = 0;
160  int queue_len = 0;
161  static unsigned int last_sent = 0;
162  cpg_handle_t *handle = (cpg_handle_t *)data;
163 
164  if (*handle == 0) {
165  crm_trace("Connection is dead");
166  return pcmk_ok;
167  }
168 
169  queue_len = g_list_length(cs_message_queue);
170  if ((queue_len % 1000) == 0 && queue_len > 1) {
171  crm_err("CPG queue has grown to %d", queue_len);
172 
173  } else if (queue_len == CS_SEND_MAX) {
174  crm_warn("CPG queue has grown to %d", queue_len);
175  }
176 
177  if (cs_message_timer) {
178  /* There is already a timer, wait until it goes off */
179  crm_trace("Timer active %d", cs_message_timer);
180  return pcmk_ok;
181  }
182 
183  while (cs_message_queue && sent < CS_SEND_MAX) {
184  struct iovec *iov = cs_message_queue->data;
185 
186  errno = 0;
187  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
188 
189  if (rc != CS_OK) {
190  break;
191  }
192 
193  sent++;
194  last_sent++;
195  crm_trace("CPG message sent, size=%d", iov->iov_len);
196 
197  cs_message_queue = g_list_remove(cs_message_queue, iov);
198  free(iov->iov_base);
199  free(iov);
200  }
201 
202  queue_len -= sent;
203  if (sent > 1 || cs_message_queue) {
204  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
205  sent, queue_len, last_sent, ais_error2text(rc), rc);
206  } else {
207  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
208  sent, queue_len, last_sent, ais_error2text(rc), rc);
209  }
210 
211  if (cs_message_queue) {
212  uint32_t delay_ms = 100;
213  if(rc != CS_OK) {
214  /* Proportionally more if sending failed but cap at 1s */
215  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
216  }
217  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
218  }
219 
220  return rc;
221 }
222 
223 gboolean
224 send_cpg_iov(struct iovec * iov)
225 {
226  static unsigned int queued = 0;
227 
228  queued++;
229  crm_trace("Queueing CPG message %u (%d bytes)", queued, iov->iov_len);
230  cs_message_queue = g_list_append(cs_message_queue, iov);
231  crm_cs_flush(&pcmk_cpg_handle);
232  return TRUE;
233 }
234 
235 static int
236 pcmk_cpg_dispatch(gpointer user_data)
237 {
238  int rc = 0;
239  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
240 
241  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
242  if (rc != CS_OK) {
243  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
244  cluster->cpg_handle = 0;
245  return -1;
246 
247  } else if(cpg_evicted) {
248  crm_err("Evicted from CPG membership");
249  return -1;
250  }
251  return 0;
252 }
253 
254 char *
255 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
256  uint32_t *kind, const char **from)
257 {
258  char *data = NULL;
259  AIS_Message *msg = (AIS_Message *) content;
260 
261  if(handle) {
262  /* 'msg' came from CPG not the plugin
263  * Do filtering and field massaging
264  */
266  const char *local_name = get_local_node_name();
267 
268  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
269  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
270  return NULL;
271 
272  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
273  /* Not for us */
274  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
275  return NULL;
276  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
277  /* Not for us */
278  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
279  return NULL;
280  }
281 
282  msg->sender.id = nodeid;
283  if (msg->sender.size == 0) {
284  crm_node_t *peer = crm_get_peer(nodeid, NULL);
285 
286  if (peer == NULL) {
287  crm_err("Peer with nodeid=%u is unknown", nodeid);
288 
289  } else if (peer->uname == NULL) {
290  crm_err("No uname for peer with nodeid=%u", nodeid);
291 
292  } else {
293  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
294  msg->sender.size = strlen(peer->uname);
295  memset(msg->sender.uname, 0, MAX_NAME);
296  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
297  }
298  }
299  }
300 
301  crm_trace("Got new%s message (size=%d, %d, %d)",
302  msg->is_compressed ? " compressed" : "",
303  ais_data_len(msg), msg->size, msg->compressed_size);
304 
305  if (kind != NULL) {
306  *kind = msg->header.id;
307  }
308  if (from != NULL) {
309  *from = msg->sender.uname;
310  }
311 
312  if (msg->is_compressed && msg->size > 0) {
313  int rc = BZ_OK;
314  char *uncompressed = NULL;
315  unsigned int new_size = msg->size + 1;
316 
317  if (check_message_sanity(msg, NULL) == FALSE) {
318  goto badmsg;
319  }
320 
321  crm_trace("Decompressing message data");
322  uncompressed = calloc(1, new_size);
323  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
324 
325  if (rc != BZ_OK) {
326  crm_err("Decompression failed: %d", rc);
327  free(uncompressed);
328  goto badmsg;
329  }
330 
331  CRM_ASSERT(rc == BZ_OK);
332  CRM_ASSERT(new_size == msg->size);
333 
334  data = uncompressed;
335 
336  } else if (check_message_sanity(msg, data) == FALSE) {
337  goto badmsg;
338 
339  } else if (safe_str_eq("identify", data)) {
340  int pid = getpid();
341  char *pid_s = crm_itoa(pid);
342 
343  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
344  free(pid_s);
345  return NULL;
346 
347  } else {
348  data = strdup(msg->data);
349  }
350 
351  if (msg->header.id != crm_class_members) {
352  /* Is this even needed anymore? */
353  crm_get_peer(msg->sender.id, msg->sender.uname);
354  }
355 
356  if (msg->header.id == crm_class_rmpeer) {
357  uint32_t id = crm_int_helper(data, NULL);
358 
359  crm_info("Removing peer %s/%u", data, id);
360  reap_crm_member(id, NULL);
361  free(data);
362  return NULL;
363 
364 #if SUPPORT_PLUGIN
365  } else if (is_classic_ais_cluster()) {
367 #endif
368  }
369 
370  crm_trace("Payload: %.200s", data);
371  return data;
372 
373  badmsg:
374  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
375  " min=%d, total=%d, size=%d, bz2_size=%d",
376  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
377  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
378  msg->sender.pid, (int)sizeof(AIS_Message),
379  msg->header.size, msg->size, msg->compressed_size);
380 
381  free(data);
382  return NULL;
383 }
384 
385 static int cmp_member_list_nodeid(const void *first,
386  const void *second)
387 {
388  const struct cpg_address *const a = *((const struct cpg_address **) first),
389  *const b = *((const struct cpg_address **) second);
390  if (a->nodeid < b->nodeid) {
391  return -1;
392  } else if (a->nodeid > b->nodeid) {
393  return 1;
394  }
395  /* don't bother with "reason" nor "pid" */
396  return 0;
397 }
398 
399 void
400 pcmk_cpg_membership(cpg_handle_t handle,
401  const struct cpg_name *groupName,
402  const struct cpg_address *member_list, size_t member_list_entries,
403  const struct cpg_address *left_list, size_t left_list_entries,
404  const struct cpg_address *joined_list, size_t joined_list_entries)
405 {
406  int i;
407  gboolean found = FALSE;
408  static int counter = 0;
410  const struct cpg_address *key, **rival, **sorted;
411 
412  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
413  CRM_ASSERT(sorted != NULL);
414 
415  for (size_t iter = 0; iter < member_list_entries; iter++) {
416  sorted[iter] = member_list + iter;
417  }
418  /* so that the cross-matching multiply-subscribed nodes is then cheap */
419  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
420  cmp_member_list_nodeid);
421 
422  for (i = 0; i < left_list_entries; i++) {
423  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
424 
425  crm_info("Node %u left group %s (peer=%s:%llu, counter=%d.%d)",
426  left_list[i].nodeid, groupName->value,
427  (peer? peer->uname : "<none>"),
428  (unsigned long long) left_list[i].pid, counter, i);
429 
430  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
431  and not playing by this rule may go wild in case of multiple
432  residual instances of the same pacemaker daemon at the same node
433  -- we must ensure that the possible local rival(s) won't make us
434  cry out and bail (e.g. when they quit themselves), since all the
435  surrounding logic denies this simple fact that the full membership
436  is discriminated also per the PID of the process beside mere node
437  ID (and implicitly, group ID); practically, this will be sound in
438  terms of not preventing progress, since all the CPG joiners are
439  also API end-point carriers, and that's what matters locally
440  (who's the winner);
441  remotely, we will just compare leave_list and member_list and if
442  the left process has it's node retained in member_list (under some
443  other PID, anyway) we will just ignore it as well
444  XXX: long-term fix is to establish in-out PID-aware tracking? */
445  if (peer) {
446  key = &left_list[i];
447  rival = bsearch(&key, sorted, member_list_entries,
448  sizeof(const struct cpg_address *),
449  cmp_member_list_nodeid);
450  if (rival == NULL) {
451  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg,
452  OFFLINESTATUS);
453  } else if (left_list[i].nodeid == local_nodeid) {
454  crm_info("Ignoring the above event %s.%d, comes from a local"
455  " rival process (presumably not us): %llu",
456  groupName->value, counter,
457  (unsigned long long) left_list[i].pid);
458  } else {
459  crm_info("Ignoring the above event %s.%d, comes from"
460  " a rival-rich node: %llu (e.g. %llu process"
461  " carries on)",
462  groupName->value, counter,
463  (unsigned long long) left_list[i].pid,
464  (unsigned long long) (*rival)->pid);
465  }
466  }
467  }
468  free(sorted);
469  sorted = NULL;
470 
471  for (i = 0; i < joined_list_entries; i++) {
472  crm_info("Node %u joined group %s (counter=%d.%d, pid=%llu,"
473  " unchecked for rivals)",
474  joined_list[i].nodeid, groupName->value, counter, i,
475  (unsigned long long) left_list[i].pid);
476  }
477 
478  for (i = 0; i < member_list_entries; i++) {
479  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
480 
481  crm_info("Node %u still member of group %s (peer=%s:%llu,"
482  " counter=%d.%d, at least once)",
483  member_list[i].nodeid, groupName->value,
484  (peer? peer->uname : "<none>"), member_list[i].pid,
485  counter, i);
486 
487  if (member_list[i].nodeid == local_nodeid
488  && member_list[i].pid != getpid()) {
489  /* see the note above */
490  crm_info("Ignoring the above event %s.%d, comes from a local rival"
491  " process: %llu", groupName->value, counter,
492  (unsigned long long) member_list[i].pid);
493  continue;
494  }
495 
496  /* Anyone that is sending us CPG messages must also be a _CPG_ member.
497  * But its _not_ safe to assume its in the quorum membership.
498  * We may have just found out its dead and are processing the last couple of messages it sent
499  */
500  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
501  if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
502  time_t now = time(NULL);
503 
504  /* Co-opt the otherwise unused votes field */
505  if(peer->votes == 0) {
506  peer->votes = now;
507 
508  } else if(now > (60 + peer->votes)) {
509  /* On the otherhand, if we're still getting messages, at a certain point
510  * we need to acknowledge our internal cache is probably wrong
511  *
512  * Set the threshold to 1 minute
513  */
514  crm_err("Node %s[%u] appears to be online even though we think"
515  " it is dead (unchecked for rivals)",
516  peer->uname, peer->id);
517  if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) {
518  peer->votes = 0;
519  }
520  }
521  }
522 
523  if (local_nodeid == member_list[i].nodeid) {
524  found = TRUE;
525  }
526  }
527 
528  if (!found) {
529  crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
530  cpg_evicted = TRUE;
531  }
532 
533  counter++;
534 }
535 
536 gboolean
538 {
539  cs_error_t rc;
540  int fd = -1;
541  int retries = 0;
542  uint32_t id = 0;
543  crm_node_t *peer = NULL;
544  cpg_handle_t handle = 0;
545  uid_t found_uid = 0;
546  gid_t found_gid = 0;
547  pid_t found_pid = 0;
548  int rv;
549 
550  struct mainloop_fd_callbacks cpg_fd_callbacks = {
551  .dispatch = pcmk_cpg_dispatch,
552  .destroy = cluster->destroy,
553  };
554 
555  cpg_callbacks_t cpg_callbacks = {
556  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
557  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
558  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
559  /* .cpg_confchg_fn = pcmk_cpg_membership, */
560  };
561 
562  cpg_evicted = FALSE;
563  cluster->group.length = 0;
564  cluster->group.value[0] = 0;
565 
566  /* group.value is char[128] */
567  strncpy(cluster->group.value, crm_system_name, 127);
568  cluster->group.value[127] = 0;
569  cluster->group.length = 1 + QB_MIN(127, strlen(crm_system_name));
570 
571  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
572  if (rc != CS_OK) {
573  crm_err("Could not connect to the CPG API: %s (%d)",
574  cs_strerror(rc), rc);
575  goto bail;
576  }
577 
578  rc = cpg_fd_get(handle, &fd);
579  if (rc != CS_OK) {
580  crm_err("Could not obtain the CPG API connection: %s (%d)",
581  cs_strerror(rc), rc);
582  goto bail;
583  }
584 
585  /* CPG provider run as root (in given user namespace, anyway)? */
586  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
587  &found_uid, &found_gid))) {
588  crm_err("CPG provider is not authentic:"
589  " process %lld (uid: %lld, gid: %lld)",
590  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
591  (long long) found_uid, (long long) found_gid);
592  rc = CS_ERR_ACCESS;
593  goto bail;
594  } else if (rv < 0) {
595  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
596  strerror(-rv), -rv);
597  rc = CS_ERR_ACCESS;
598  goto bail;
599  }
600 
601  id = get_local_nodeid(handle);
602  if (id == 0) {
603  crm_err("Could not get local node id from the CPG API");
604  goto bail;
605 
606  }
607  cluster->nodeid = id;
608 
609  retries = 0;
610  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
611  if (rc != CS_OK) {
612  crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
613  goto bail;
614  }
615 
616  pcmk_cpg_handle = handle;
617  cluster->cpg_handle = handle;
618  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
619 
620  bail:
621  if (rc != CS_OK) {
622  cpg_finalize(handle);
623  return FALSE;
624  }
625 
626  peer = crm_get_peer(id, NULL);
627  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
628  return TRUE;
629 }
630 
631 gboolean
632 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
633 {
634  gboolean rc = TRUE;
635  char *data = NULL;
636 
637  data = dump_xml_unformatted(msg);
638  rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
639  free(data);
640  return rc;
641 }
642 
643 gboolean
644 send_cluster_text(int class, const char *data,
645  gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
646 {
647  static int msg_id = 0;
648  static int local_pid = 0;
649  static int local_name_len = 0;
650  static const char *local_name = NULL;
651 
652  char *target = NULL;
653  struct iovec *iov;
654  AIS_Message *msg = NULL;
656 
657  /* There are only 6 handlers registered to crm_lib_service in plugin.c */
658  CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
659  return FALSE);
660 
661 #if !SUPPORT_PLUGIN
662  CRM_CHECK(dest != crm_msg_ais, return FALSE);
663 #endif
664 
665  if(local_name == NULL) {
666  local_name = get_local_node_name();
667  }
668  if(local_name_len == 0 && local_name) {
669  local_name_len = strlen(local_name);
670  }
671 
672  if (data == NULL) {
673  data = "";
674  }
675 
676  if (local_pid == 0) {
677  local_pid = getpid();
678  }
679 
680  if (sender == crm_msg_none) {
681  sender = local_pid;
682  }
683 
684  msg = calloc(1, sizeof(AIS_Message));
685 
686  msg_id++;
687  msg->id = msg_id;
688  msg->header.id = class;
689  msg->header.error = CS_OK;
690 
691  msg->host.type = dest;
692  msg->host.local = local;
693 
694  if (node) {
695  if (node->uname) {
696  target = strdup(node->uname);
697  msg->host.size = strlen(node->uname);
698  memset(msg->host.uname, 0, MAX_NAME);
699  memcpy(msg->host.uname, node->uname, msg->host.size);
700  } else {
701  target = crm_strdup_printf("%u", node->id);
702  }
703  msg->host.id = node->id;
704  } else {
705  target = strdup("all");
706  }
707 
708  msg->sender.id = 0;
709  msg->sender.type = sender;
710  msg->sender.pid = local_pid;
711  msg->sender.size = local_name_len;
712  memset(msg->sender.uname, 0, MAX_NAME);
713  if(local_name && msg->sender.size) {
714  memcpy(msg->sender.uname, local_name, msg->sender.size);
715  }
716 
717  msg->size = 1 + strlen(data);
718  msg->header.size = sizeof(AIS_Message) + msg->size;
719 
720  if (msg->size < CRM_BZ2_THRESHOLD) {
721  msg = realloc_safe(msg, msg->header.size);
722  memcpy(msg->data, data, msg->size);
723 
724  } else {
725  char *compressed = NULL;
726  unsigned int new_size = 0;
727  char *uncompressed = strdup(data);
728 
729  if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
730 
731  msg->header.size = sizeof(AIS_Message) + new_size;
732  msg = realloc_safe(msg, msg->header.size);
733  memcpy(msg->data, compressed, new_size);
734 
735  msg->is_compressed = TRUE;
736  msg->compressed_size = new_size;
737 
738  } else {
739  msg = realloc_safe(msg, msg->header.size);
740  memcpy(msg->data, data, msg->size);
741  }
742 
743  free(uncompressed);
744  free(compressed);
745  }
746 
747  iov = calloc(1, sizeof(struct iovec));
748  iov->iov_base = msg;
749  iov->iov_len = msg->header.size;
750 
751  if (msg->compressed_size) {
752  crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes compressed payload): %.200s",
753  msg->id, target, iov->iov_len, msg->compressed_size, data);
754  } else {
755  crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes payload): %.200s",
756  msg->id, target, iov->iov_len, msg->size, data);
757  }
758  free(target);
759 
760 #if SUPPORT_PLUGIN
761  /* The plugin is the only time we dont use CPG messaging */
763  return send_plugin_text(class, iov);
764  }
765 #endif
766 
767  send_cpg_iov(iov);
768 
769  return TRUE;
770 }
771 
773 text2msg_type(const char *text)
774 {
775  int type = crm_msg_none;
776 
777  CRM_CHECK(text != NULL, return type);
778  if (safe_str_eq(text, "ais")) {
779  type = crm_msg_ais;
780  } else if (safe_str_eq(text, "crm_plugin")) {
781  type = crm_msg_ais;
782  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
783  type = crm_msg_cib;
784  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
785  type = crm_msg_crmd;
786  } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
787  type = crm_msg_crmd;
788  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
789  type = crm_msg_te;
790  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
791  type = crm_msg_pe;
792  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
793  type = crm_msg_lrmd;
794  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
795  type = crm_msg_stonithd;
796  } else if (safe_str_eq(text, "stonith-ng")) {
797  type = crm_msg_stonith_ng;
798  } else if (safe_str_eq(text, "attrd")) {
799  type = crm_msg_attrd;
800 
801  } else {
802  /* This will normally be a transient client rather than
803  * a cluster daemon. Set the type to the pid of the client
804  */
805  int scan_rc = sscanf(text, "%d", &type);
806 
807  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
808  /* Ensure its sane */
809  type = crm_msg_none;
810  }
811  }
812  return type;
813 }
bool send_plugin_text(int class, struct iovec *iov)
Definition: legacy.c:133
enum crm_ais_msg_types type
Definition: internal.h:39
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:164
char data[0]
Definition: internal.h:56
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:224
uint32_t local_nodeid
Definition: plugin.c:65
#define crm_notice(fmt, args...)
Definition: logging.h:250
gboolean is_compressed
Definition: internal.h:48
uint32_t size
Definition: internal.h:53
gboolean safe_str_neq(const char *a, const char *b)
Definition: utils.c:659
crm_ais_msg_types
Definition: cluster.h:125
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:806
uint32_t nodeid
Definition: cluster.h:94
uint32_t id
Definition: cluster.h:70
gboolean crm_is_peer_active(const crm_node_t *node)
Definition: membership.c:143
const char * get_local_node_name(void)
Definition: cluster.c:289
void(* destroy)(gpointer)
Definition: cluster.h:96
#define pcmk_ok
Definition: error.h:42
uint32_t id
Definition: internal.h:36
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:34
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:519
char * crm_system_name
Definition: utils.c:65
#define CS_SEND_MAX
Definition: cpg.c:154
uint32_t pid
Definition: internal.h:49
char * strerror(int errnum)
AIS_Host sender
Definition: internal.h:53
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:255
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:80
void plugin_handle_membership(AIS_Message *msg)
Definition: legacy.c:218
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:90
int cs_message_timer
Definition: cpg.c:142
#define crm_warn(fmt, args...)
Definition: logging.h:249
uint32_t id
Definition: internal.h:48
#define crm_debug(fmt, args...)
Definition: logging.h:253
GListPtr cs_message_queue
Definition: cpg.c:141
#define crm_trace(fmt, args...)
Definition: logging.h:254
gboolean local
Definition: internal.h:38
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:723
#define CRM_SYSTEM_PENGINE
Definition: crm.h:86
AIS_Host sender
Definition: internal.h:51
uint32_t id
Definition: internal.h:47
gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:644
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: plugin.c:1372
struct crm_ais_msg_s AIS_Message
Definition: internal.h:33
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
#define ais_data_len(msg)
Definition: internal.h:210
uint32_t size
Definition: internal.h:40
#define CRM_NODE_MEMBER
Definition: cluster.h:44
guint reap_crm_member(uint32_t id, const char *name)
Remove all peer cache entries matching a node ID and/or uname.
Definition: membership.c:199
uint32_t compressed_size
Definition: internal.h:54
uint32_t counter
Definition: internal.h:50
#define MAX_NAME
Definition: crm.h:44
#define CRM_SYSTEM_CRMD
Definition: crm.h:84
#define CRM_SYSTEM_STONITHD
Definition: crm.h:88
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node&#39;s state and membership information.
Definition: membership.c:906
#define CRM_SYSTEM_CIB
Definition: crm.h:83
#define CRM_SYSTEM_TENGINE
Definition: crm.h:87
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
Definition: utils.c:2371
#define crm_err(fmt, args...)
Definition: logging.h:248
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:124
char uname[MAX_NAME]
Definition: internal.h:41
#define OFFLINESTATUS
Definition: util.h:49
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:773
#define CRM_BZ2_THRESHOLD
Definition: xml.h:50
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3987
#define CRM_SYSTEM_LRMD
Definition: crm.h:85
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:632
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
char data[0]
Definition: internal.h:58
char * state
Definition: cluster.h:81
Wrappers for and extensions to libqb IPC.
int32_t votes
Definition: cluster.h:75
uint32_t pid
Definition: internal.h:37
char * uname
Definition: cluster.h:79
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
AIS_Host host
Definition: internal.h:50
char * crm_itoa(int an_int)
Definition: utils.c:432
#define safe_str_eq(a, b)
Definition: util.h:74
#define ONLINESTATUS
Definition: util.h:48
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:383
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:400
GList * GListPtr
Definition: crm.h:190
long long crm_int_helper(const char *text, char **end_text)
Definition: utils.c:588
#define crm_info(fmt, args...)
Definition: logging.h:251
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:537
gboolean is_classic_ais_cluster(void)
Definition: cluster.c:613
enum crm_ais_msg_types type
Definition: internal.h:51
enum cluster_type_e get_cluster_type(void)
Definition: cluster.c:502
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc.c:1288
gboolean local
Definition: internal.h:50